| Message ID | 20260401091318.2671624-2-elibr@nvidia.com |
|---|---|
| State | New |
| Delegated to: | Eelco Chaudron |
| Headers | show |
| Series | netdev-doca | expand |
| Context | Check | Description |
|---|---|---|
| ovsrobot/apply-robot | warning | apply and check: warning |
| ovsrobot/github-robot-_Build_and_Test | fail | github build: failed |
On 1 Apr 2026, at 11:13, Eli Britstein wrote: > From: Gaetan Rivet <gaetanr@nvidia.com> > > Add a way to schedule executions with the RCU using memory embedded > within the object being scheduled, if applicable. > > This way, freeing a high volume of objects does not require many small > allocations, potentially increasing heap fragmentation and memory > pressure. Thanks, Gaetan, for following up on this patch. This embedded version looks way nicer. I have a few comments below. Cheers, Eelco > Signed-off-by: Gaetan Rivet <gaetanr@nvidia.com> > Co-authored-by: Eli Britstein <elibr@nvidia.com> > Signed-off-by: Eli Britstein <elibr@nvidia.com> > --- > lib/guarded-list.c | 10 ++++ > lib/guarded-list.h | 2 + > lib/ovs-rcu.c | 110 ++++++++++++++++++++++--------------- > lib/ovs-rcu.h | 39 ++++++++++++++ > tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++ > 5 files changed, 249 insertions(+), 43 deletions(-) > > diff --git a/lib/guarded-list.c b/lib/guarded-list.c > index 2186d074e..bb77fb55f 100644 > --- a/lib/guarded-list.c > +++ b/lib/guarded-list.c > @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, > return retval; > } > > +void > +guarded_list_push_back_all(struct guarded_list *list, > + struct ovs_list *nodes, size_t n) > +{ guarded_list_push_back_all() trusts the 'n' parameter matches the actual list length, which could lead to incorrect counts if caller passes wrong value. I would prefer verifying which is probably as expensive as computing it internally. > + ovs_mutex_lock(&list->mutex); > + ovs_list_push_back_all(&list->list, nodes); > + list->n += n; > + ovs_mutex_unlock(&list->mutex); > +} > + > struct ovs_list * > guarded_list_pop_front(struct guarded_list *list) > { > diff --git a/lib/guarded-list.h b/lib/guarded-list.h > index 80ce22c12..b575dc425 100644 > --- a/lib/guarded-list.h > +++ b/lib/guarded-list.h > @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); > > size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, > size_t max); > +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, > + size_t n); > struct ovs_list *guarded_list_pop_front(struct guarded_list *); > size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); > > diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c > index 49afcc55c..54e6c469d 100644 > --- a/lib/ovs-rcu.c > +++ b/lib/ovs-rcu.c > @@ -38,7 +38,7 @@ struct ovsrcu_cb { > }; > > struct ovsrcu_cbset { > - struct ovs_list list_node; > + struct ovsrcu_node rcu_node; > struct ovsrcu_cb *cbs; > size_t n_allocated; > int n_cbs; > @@ -49,6 +49,8 @@ struct ovsrcu_perthread { > > uint64_t seqno; > struct ovsrcu_cbset *cbset; > + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ > + size_t n_pending; We might not need this, based on my earlier comment regarding guarded_list_push_back_all(). > char name[16]; /* This thread's name. */ > }; > > @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; > static struct ovs_list ovsrcu_threads; > static struct ovs_mutex ovsrcu_threads_mutex; > > -static struct guarded_list flushed_cbsets; > -static struct seq *flushed_cbsets_seq; > +static struct guarded_list flushed_nodes; > +static struct seq *flushed_nodes_seq; > > static struct latch postpone_exit; > static struct ovs_barrier postpone_barrier; > > static void ovsrcu_init_module(void); > -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); > -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); > +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); > +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); > static void ovsrcu_unregister__(struct ovsrcu_perthread *); > static bool ovsrcu_call_postponed(void); > static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); > @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) > perthread = xmalloc(sizeof *perthread); > perthread->seqno = seq_read(global_seqno); > perthread->cbset = NULL; > + ovs_list_init(&perthread->pending); > + perthread->n_pending = 0; > ovs_strlcpy(perthread->name, name[0] ? name : "main", > sizeof perthread->name); > > @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) > > perthread = ovsrcu_perthread_get(); > perthread->seqno = seq_read(global_seqno); > - if (perthread->cbset) { > - ovsrcu_flush_cbset(perthread); > - } > + ovsrcu_flush_nodes(perthread); > seq_change(global_seqno); > > ovsrcu_quiesced(); > @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) > perthread = ovsrcu_perthread_get(); > if (!seq_try_lock()) { > perthread->seqno = seq_read(global_seqno); > - if (perthread->cbset) { > - ovsrcu_flush_cbset__(perthread, true); > - } > + ovsrcu_flush_nodes__(perthread, true); > seq_change_protected(global_seqno); > seq_unlock(); > ovsrcu_quiesced(); > @@ -264,10 +264,10 @@ ovsrcu_exit(void) > /* Repeatedly: > * > * - Wait for a grace period. One important side effect is to push the > - * running thread's cbset into 'flushed_cbsets' so that the next call > + * running thread's nodes into 'flushed_nodes' so that the next call > * has something to call. > * > - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, > + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, > * we're done, otherwise the callbacks themselves might have requested > * more deferred callbacks so we go around again. > * > @@ -282,6 +282,32 @@ ovsrcu_exit(void) > } > } > > +static void > +ovsrcu_run_cbset(void *aux) Maybe the name should be more explicit, for example; ovsrcu_cbset_execute_and_free() You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream regression failures. > +{ > + struct ovsrcu_cbset *cbset = aux; > + struct ovsrcu_cb *cb; > + > + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > + cb->function(cb->aux); > + } > + Should this be something like this, as &cbset->cbs[cbset->n_cbs] is undefined behaviour when cbset->cbs is NULL. ovs_assert(cbset->n_cbs <= cbset->n_allocated); if (cbset->cbs != NULL && cbset->n_cbs > 0) { for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { cb->function(cb->aux); } } But I guess this would even be cleaner: ovs_assert(cbset->n_cbs <= cbset->n_allocated); for (size_t i = 0; i < cbset->n_cbs; i++) { cbset->cbs[i].function(cbset->cbs[i].aux); } > + free(cbset->cbs); > + free(cbset); > +} > + > +void > +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, > + struct ovsrcu_node *rcu_node) > +{ > + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); > + > + rcu_node->cb = function; > + rcu_node->aux = aux; > + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); > + perthread->n_pending++; > +} > + > /* Registers 'function' to be called, passing 'aux' as argument, after the > * next grace period. > * > @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) > cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); > cbset->n_allocated = MIN_CBS; > cbset->n_cbs = 0; > + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); > } > > if (cbset->n_cbs == cbset->n_allocated) { > @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) > static bool OVS_NO_SANITIZE_FUNCTION > ovsrcu_call_postponed(void) > { > - struct ovsrcu_cbset *cbset; > - struct ovs_list cbsets; > + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); > + struct ovsrcu_node *node; > > - guarded_list_pop_all(&flushed_cbsets, &cbsets); > - if (ovs_list_is_empty(&cbsets)) { > + guarded_list_pop_all(&flushed_nodes, &nodes); > + if (ovs_list_is_empty(&nodes)) { > return false; > } > > ovsrcu_synchronize(); > > - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { > - struct ovsrcu_cb *cb; > - > - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > - cb->function(cb->aux); > - } > - free(cbset->cbs); > - free(cbset); > + LIST_FOR_EACH_POP (node, list_node, &nodes) { > + node->cb(node->aux); > } > > return true; > @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) > pthread_detach(pthread_self()); > > while (!latch_is_set(&postpone_exit)) { > - uint64_t seqno = seq_read(flushed_cbsets_seq); > + uint64_t cb_seqno = seq_read(flushed_nodes_seq); > if (!ovsrcu_call_postponed()) { > - seq_wait(flushed_cbsets_seq, seqno); > + seq_wait(flushed_nodes_seq, cb_seqno); > latch_wait(&postpone_exit); > poll_block(); > } > @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) > } > > static void > -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) > +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) > { > - struct ovsrcu_cbset *cbset = perthread->cbset; > + if (ovs_list_is_empty(&perthread->pending)) { > + return; > + } > > - if (cbset) { > - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); > - perthread->cbset = NULL; > + perthread->cbset = NULL; > + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, > + perthread->n_pending); > + ovs_list_init(&perthread->pending); Don't think there is a need to call init. ovs_list_push_back_all() calls ovs_list_splice() which leaves the source list empty, making this ovs_list_init() call redundant. > + perthread->n_pending = 0; > > - if (protected) { > - seq_change_protected(flushed_cbsets_seq); > - } else { > - seq_change(flushed_cbsets_seq); > - } > + if (protected) { > + seq_change_protected(flushed_nodes_seq); > + } else { > + seq_change(flushed_nodes_seq); > } > } > > static void > -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) > +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) > { > - ovsrcu_flush_cbset__(perthread, false); > + ovsrcu_flush_nodes__(perthread, false); > } > > static void > ovsrcu_unregister__(struct ovsrcu_perthread *perthread) > { > - if (perthread->cbset) { > - ovsrcu_flush_cbset(perthread); > + if (!ovs_list_is_empty(&perthread->pending)) { > + ovsrcu_flush_nodes(perthread); > } > > ovs_mutex_lock(&ovsrcu_threads_mutex); > @@ -438,8 +462,8 @@ ovsrcu_init_module(void) > ovs_list_init(&ovsrcu_threads); > ovs_mutex_init(&ovsrcu_threads_mutex); > > - guarded_list_init(&flushed_cbsets); > - flushed_cbsets_seq = seq_create(); > + guarded_list_init(&flushed_nodes); > + flushed_nodes_seq = seq_create(); > > ovsthread_once_done(&once); > } > diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h > index a1c15c126..efd43a1a2 100644 > --- a/lib/ovs-rcu.h > +++ b/lib/ovs-rcu.h > @@ -125,6 +125,22 @@ > * ovs_mutex_unlock(&mutex); > * } > * > + * As an alternative to ovsrcu_postpone(), the same deferred execution can be > + * achieved using ovsrcu_postpone_embedded(): > + * > + * struct deferrable { > + * struct ovsrcu_node rcu_node; > + * }; > + * > + * void > + * deferred_free(struct deferrable *d) > + * { > + * ovsrcu_postpone_embedded(free, d, rcu_node); > + * } > + * > + * Using embedded fields can be preferred sometimes to avoid the small > + * allocations done in ovsrcu_postpone(). > + * > * In some rare cases an object may not be addressable with a pointer, but only > * through an array index (e.g. because it's provided by another library). It > * is still possible to have RCU semantics by using the ovsrcu_index type. > @@ -173,6 +189,8 @@ > #include "compiler.h" > #include "ovs-atomic.h" > > +#include "openvswitch/list.h" > + > #if __GNUC__ > #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } > #define OVSRCU_INITIALIZER(VALUE) { VALUE } > @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); > (void) sizeof(*(ARG)), \ > ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) > > +struct ovsrcu_node { > + struct ovs_list list_node; > + void (*cb)(void *aux); > + void *aux; > +}; > + > +/* Calls FUNCTION passing ARG as its pointer-type argument, which > + * contains an 'ovsrcu_node' as a field named MEMBER. The function Missing double space after period: "MEMBER. The function" should be "MEMBER. The function". > + * is called following the next grace period. See 'Usage' above for an > + * example. Should this comment also mention that the same ovsrcu_node must not be scheduled multiple times? This restriction is not obvious and violating it could cause subtle bugs (use-after-free when the callback executes twice and frees the object twice). > + */ > +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, > + struct ovsrcu_node *node); > +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ > + (/* Verify that ARG is appropriate for FUNCTION. */ \ > + (void) sizeof((FUNCTION)(ARG), 1), \ > + /* Verify that ARG is a pointer type. */ \ > + (void) sizeof(*(ARG)), \ > + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ > + &(ARG)->MEMBER)) > + > /* An array index protected by RCU semantics. This is an easier alternative to > * an RCU protected pointer to a malloc'd int. */ > typedef struct { atomic_int v; } ovsrcu_index; > diff --git a/tests/test-rcu.c b/tests/test-rcu.c > index bb17092bf..26150e7d9 100644 > --- a/tests/test-rcu.c > +++ b/tests/test-rcu.c > @@ -17,11 +17,16 @@ > #include <config.h> > #undef NDEBUG > #include "fatal-signal.h" > +#include "ovs-atomic.h" > #include "ovs-rcu.h" > #include "ovs-thread.h" > #include "ovstest.h" > +#include "seq.h" > +#include "timeval.h" > #include "util.h" > > +#include "openvswitch/poll-loop.h" > + > static void * > quiescer_main(void *aux OVS_UNUSED) > { > @@ -67,10 +72,136 @@ test_rcu_barrier(void) > ovs_assert(count == 10); > } > > +struct element { > + struct ovsrcu_node rcu_node; > + struct seq *trigger; > + atomic_bool wait; > +}; > + > +static void > +trigger_cb(void *e_) > +{ > + struct element *e = (struct element *) e_; > + > + seq_change(e->trigger); Maybe add a counter in struct element to verify the callback executes exactly once, catching potential double-execution bugs. > +} > + > +static void * > +wait_main(void *aux) > +{ > + struct element *e = aux; > + > + for (;;) { > + bool wait; > + > + atomic_read(&e->wait, &wait); > + if (!wait) { > + break; > + } > + } > + > + seq_wait(e->trigger, seq_read(e->trigger)); > + poll_block(); > + > + return NULL; > +} > + > +static void > +test_rcu_postpone_embedded(bool multithread) > +{ > + long long int timeout; > + pthread_t waiter; > + struct element e; > + uint64_t seqno; > + > + atomic_init(&e.wait, true); > + > + if (multithread) { > + waiter = ovs_thread_create("waiter", wait_main, &e); > + } > + > + e.trigger = seq_create(); > + seqno = seq_read(e.trigger); > + > + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); > + > + /* Check that GC holds out until all threads are quiescent. */ > + timeout = time_msec(); > + if (multithread) { > + timeout += 200; > + } > + while (time_msec() <= timeout) { > + ovs_assert(seq_read(e.trigger) == seqno); > + } > + > + atomic_store(&e.wait, false); > + > + seq_wait(e.trigger, seqno); > + poll_timer_wait_until(time_msec() + 200); > + poll_block(); > + > + /* Verify that GC executed. */ > + ovs_assert(seq_read(e.trigger) != seqno); > + seq_destroy(e.trigger); > + > + if (multithread) { > + xpthread_join(waiter, NULL); > + } > +} > + > +#define N_ORDER_CBS 5 > + > +struct order_element { > + struct ovsrcu_node rcu_node; > + int id; > + int *log; > + int *log_idx; > +}; > + > +static void > +order_cb(void *aux) > +{ > + struct order_element *e = aux; > + e->log[(*e->log_idx)++] = e->id; > +} > + > +static void > +test_rcu_ordering(void) > +{ The documentation states "All functions postponed by a single thread are guaranteed to execute in the order they were postponed", the test should verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded() calls. > + struct order_element elems[N_ORDER_CBS]; > + int log[N_ORDER_CBS]; > + int log_idx = 0; > + > + for (int i = 0; i < N_ORDER_CBS; i++) { > + elems[i].id = i; > + elems[i].log = log; > + elems[i].log_idx = &log_idx; > + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); > + } > + > + ovsrcu_barrier(); > + > + ovs_assert(log_idx == N_ORDER_CBS); > + for (int i = 0; i < N_ORDER_CBS; i++) { > + if (log[i] != i) { > + ovs_abort(0, "RCU embedded callback ordering violated: " > + "expected cb %d at position %d, got %d", > + i, i, log[i]); > + } > + } > +} > + > static void > test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { > + const bool multithread = true; > + > + /* Execute single-threaded check before spawning additional threads. */ > + test_rcu_postpone_embedded(!multithread); > + test_rcu_postpone_embedded(multithread); > + > test_rcu_quiesce(); > test_rcu_barrier(); > + test_rcu_ordering(); test_rcu_ordering() should be tested in both single-threaded and multi-threaded modes like test_rcu_postpone_embedded() to verify ordering guarantees hold in both RCU execution paths. > } > > OVSTEST_REGISTER("test-rcu", test_rcu);
On 13/04/2026 16:21, Eelco Chaudron wrote: > External email: Use caution opening links or attachments > > > On 1 Apr 2026, at 11:13, Eli Britstein wrote: > >> From: Gaetan Rivet <gaetanr@nvidia.com> >> >> Add a way to schedule executions with the RCU using memory embedded >> within the object being scheduled, if applicable. >> >> This way, freeing a high volume of objects does not require many small >> allocations, potentially increasing heap fragmentation and memory >> pressure. > > Thanks, Gaetan, for following up on this patch. This embedded version looks > way nicer. I have a few comments below. > > Cheers, > > Eelco > >> Signed-off-by: Gaetan Rivet <gaetanr@nvidia.com> >> Co-authored-by: Eli Britstein <elibr@nvidia.com> >> Signed-off-by: Eli Britstein <elibr@nvidia.com> >> --- >> lib/guarded-list.c | 10 ++++ >> lib/guarded-list.h | 2 + >> lib/ovs-rcu.c | 110 ++++++++++++++++++++++--------------- >> lib/ovs-rcu.h | 39 ++++++++++++++ >> tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++ >> 5 files changed, 249 insertions(+), 43 deletions(-) >> >> diff --git a/lib/guarded-list.c b/lib/guarded-list.c >> index 2186d074e..bb77fb55f 100644 >> --- a/lib/guarded-list.c >> +++ b/lib/guarded-list.c >> @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, >> return retval; >> } >> >> +void >> +guarded_list_push_back_all(struct guarded_list *list, >> + struct ovs_list *nodes, size_t n) >> +{ > > guarded_list_push_back_all() trusts the 'n' parameter matches the actual > list length, which could lead to incorrect counts if caller passes wrong > value. I would prefer verifying which is probably as expensive as > computing it internally. > Ack, I think though it's a shame to have an unbounded list of nodes that we push one by one under lock only to keep the guarded list under a max count, while this max count is not otherwise used. Would you be ok to change to an ovs_list + mutex, and keep the single op move to the flushed nodes? >> + ovs_mutex_lock(&list->mutex); >> + ovs_list_push_back_all(&list->list, nodes); >> + list->n += n; >> + ovs_mutex_unlock(&list->mutex); >> +} >> + >> struct ovs_list * >> guarded_list_pop_front(struct guarded_list *list) >> { >> diff --git a/lib/guarded-list.h b/lib/guarded-list.h >> index 80ce22c12..b575dc425 100644 >> --- a/lib/guarded-list.h >> +++ b/lib/guarded-list.h >> @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); >> >> size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, >> size_t max); >> +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, >> + size_t n); >> struct ovs_list *guarded_list_pop_front(struct guarded_list *); >> size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); >> >> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c >> index 49afcc55c..54e6c469d 100644 >> --- a/lib/ovs-rcu.c >> +++ b/lib/ovs-rcu.c >> @@ -38,7 +38,7 @@ struct ovsrcu_cb { >> }; >> >> struct ovsrcu_cbset { >> - struct ovs_list list_node; >> + struct ovsrcu_node rcu_node; >> struct ovsrcu_cb *cbs; >> size_t n_allocated; >> int n_cbs; >> @@ -49,6 +49,8 @@ struct ovsrcu_perthread { >> >> uint64_t seqno; >> struct ovsrcu_cbset *cbset; >> + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ >> + size_t n_pending; > > We might not need this, based on my earlier comment regarding > guarded_list_push_back_all(). > Ack. >> char name[16]; /* This thread's name. */ >> }; >> >> @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; >> static struct ovs_list ovsrcu_threads; >> static struct ovs_mutex ovsrcu_threads_mutex; >> >> -static struct guarded_list flushed_cbsets; >> -static struct seq *flushed_cbsets_seq; >> +static struct guarded_list flushed_nodes; >> +static struct seq *flushed_nodes_seq; >> >> static struct latch postpone_exit; >> static struct ovs_barrier postpone_barrier; >> >> static void ovsrcu_init_module(void); >> -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); >> -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); >> +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); >> +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); >> static void ovsrcu_unregister__(struct ovsrcu_perthread *); >> static bool ovsrcu_call_postponed(void); >> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); >> @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) >> perthread = xmalloc(sizeof *perthread); >> perthread->seqno = seq_read(global_seqno); >> perthread->cbset = NULL; >> + ovs_list_init(&perthread->pending); >> + perthread->n_pending = 0; >> ovs_strlcpy(perthread->name, name[0] ? name : "main", >> sizeof perthread->name); >> >> @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) >> >> perthread = ovsrcu_perthread_get(); >> perthread->seqno = seq_read(global_seqno); >> - if (perthread->cbset) { >> - ovsrcu_flush_cbset(perthread); >> - } >> + ovsrcu_flush_nodes(perthread); >> seq_change(global_seqno); >> >> ovsrcu_quiesced(); >> @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) >> perthread = ovsrcu_perthread_get(); >> if (!seq_try_lock()) { >> perthread->seqno = seq_read(global_seqno); >> - if (perthread->cbset) { >> - ovsrcu_flush_cbset__(perthread, true); >> - } >> + ovsrcu_flush_nodes__(perthread, true); >> seq_change_protected(global_seqno); >> seq_unlock(); >> ovsrcu_quiesced(); >> @@ -264,10 +264,10 @@ ovsrcu_exit(void) >> /* Repeatedly: >> * >> * - Wait for a grace period. One important side effect is to push the >> - * running thread's cbset into 'flushed_cbsets' so that the next call >> + * running thread's nodes into 'flushed_nodes' so that the next call >> * has something to call. >> * >> - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, >> + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, >> * we're done, otherwise the callbacks themselves might have requested >> * more deferred callbacks so we go around again. >> * >> @@ -282,6 +282,32 @@ ovsrcu_exit(void) >> } >> } >> >> +static void >> +ovsrcu_run_cbset(void *aux) > > Maybe the name should be more explicit, for example; > ovsrcu_cbset_execute_and_free() > > You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream > regression failures. > Ack. >> +{ >> + struct ovsrcu_cbset *cbset = aux; >> + struct ovsrcu_cb *cb; >> + >> + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >> + cb->function(cb->aux); >> + } >> + > > Should this be something like this, as &cbset->cbs[cbset->n_cbs] is > undefined behaviour when cbset->cbs is NULL. > > ovs_assert(cbset->n_cbs <= cbset->n_allocated); > > if (cbset->cbs != NULL && cbset->n_cbs > 0) { > for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > cb->function(cb->aux); > } > } > > But I guess this would even be cleaner: > > ovs_assert(cbset->n_cbs <= cbset->n_allocated); > > for (size_t i = 0; i < cbset->n_cbs; i++) { > cbset->cbs[i].function(cbset->cbs[i].aux); > } > Ack. >> + free(cbset->cbs); >> + free(cbset); >> +} >> + >> +void >> +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, >> + struct ovsrcu_node *rcu_node) >> +{ >> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); >> + >> + rcu_node->cb = function; >> + rcu_node->aux = aux; >> + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); >> + perthread->n_pending++; >> +} >> + >> /* Registers 'function' to be called, passing 'aux' as argument, after the >> * next grace period. >> * >> @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) >> cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); >> cbset->n_allocated = MIN_CBS; >> cbset->n_cbs = 0; >> + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); >> } >> >> if (cbset->n_cbs == cbset->n_allocated) { >> @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) >> static bool OVS_NO_SANITIZE_FUNCTION >> ovsrcu_call_postponed(void) >> { >> - struct ovsrcu_cbset *cbset; >> - struct ovs_list cbsets; >> + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); >> + struct ovsrcu_node *node; >> >> - guarded_list_pop_all(&flushed_cbsets, &cbsets); >> - if (ovs_list_is_empty(&cbsets)) { >> + guarded_list_pop_all(&flushed_nodes, &nodes); >> + if (ovs_list_is_empty(&nodes)) { >> return false; >> } >> >> ovsrcu_synchronize(); >> >> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { >> - struct ovsrcu_cb *cb; >> - >> - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >> - cb->function(cb->aux); >> - } >> - free(cbset->cbs); >> - free(cbset); >> + LIST_FOR_EACH_POP (node, list_node, &nodes) { >> + node->cb(node->aux); >> } >> >> return true; >> @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) >> pthread_detach(pthread_self()); >> >> while (!latch_is_set(&postpone_exit)) { >> - uint64_t seqno = seq_read(flushed_cbsets_seq); >> + uint64_t cb_seqno = seq_read(flushed_nodes_seq); >> if (!ovsrcu_call_postponed()) { >> - seq_wait(flushed_cbsets_seq, seqno); >> + seq_wait(flushed_nodes_seq, cb_seqno); >> latch_wait(&postpone_exit); >> poll_block(); >> } >> @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) >> } >> >> static void >> -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) >> +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) >> { >> - struct ovsrcu_cbset *cbset = perthread->cbset; >> + if (ovs_list_is_empty(&perthread->pending)) { >> + return; >> + } >> >> - if (cbset) { >> - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); >> - perthread->cbset = NULL; >> + perthread->cbset = NULL; >> + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, >> + perthread->n_pending); >> + ovs_list_init(&perthread->pending); > > Don't think there is a need to call init. ovs_list_push_back_all() calls > ovs_list_splice() which leaves the source list empty, making this > ovs_list_init() call redundant. > Ack. >> + perthread->n_pending = 0; >> >> - if (protected) { >> - seq_change_protected(flushed_cbsets_seq); >> - } else { >> - seq_change(flushed_cbsets_seq); >> - } >> + if (protected) { >> + seq_change_protected(flushed_nodes_seq); >> + } else { >> + seq_change(flushed_nodes_seq); >> } >> } >> >> static void >> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) >> +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) >> { >> - ovsrcu_flush_cbset__(perthread, false); >> + ovsrcu_flush_nodes__(perthread, false); >> } >> >> static void >> ovsrcu_unregister__(struct ovsrcu_perthread *perthread) >> { >> - if (perthread->cbset) { >> - ovsrcu_flush_cbset(perthread); >> + if (!ovs_list_is_empty(&perthread->pending)) { >> + ovsrcu_flush_nodes(perthread); >> } >> >> ovs_mutex_lock(&ovsrcu_threads_mutex); >> @@ -438,8 +462,8 @@ ovsrcu_init_module(void) >> ovs_list_init(&ovsrcu_threads); >> ovs_mutex_init(&ovsrcu_threads_mutex); >> >> - guarded_list_init(&flushed_cbsets); >> - flushed_cbsets_seq = seq_create(); >> + guarded_list_init(&flushed_nodes); >> + flushed_nodes_seq = seq_create(); >> >> ovsthread_once_done(&once); >> } >> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h >> index a1c15c126..efd43a1a2 100644 >> --- a/lib/ovs-rcu.h >> +++ b/lib/ovs-rcu.h >> @@ -125,6 +125,22 @@ >> * ovs_mutex_unlock(&mutex); >> * } >> * >> + * As an alternative to ovsrcu_postpone(), the same deferred execution can be >> + * achieved using ovsrcu_postpone_embedded(): >> + * >> + * struct deferrable { >> + * struct ovsrcu_node rcu_node; >> + * }; >> + * >> + * void >> + * deferred_free(struct deferrable *d) >> + * { >> + * ovsrcu_postpone_embedded(free, d, rcu_node); >> + * } >> + * >> + * Using embedded fields can be preferred sometimes to avoid the small >> + * allocations done in ovsrcu_postpone(). >> + * >> * In some rare cases an object may not be addressable with a pointer, but only >> * through an array index (e.g. because it's provided by another library). It >> * is still possible to have RCU semantics by using the ovsrcu_index type. >> @@ -173,6 +189,8 @@ >> #include "compiler.h" >> #include "ovs-atomic.h" >> >> +#include "openvswitch/list.h" >> + >> #if __GNUC__ >> #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } >> #define OVSRCU_INITIALIZER(VALUE) { VALUE } >> @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); >> (void) sizeof(*(ARG)), \ >> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) >> >> +struct ovsrcu_node { >> + struct ovs_list list_node; >> + void (*cb)(void *aux); >> + void *aux; >> +}; >> + >> +/* Calls FUNCTION passing ARG as its pointer-type argument, which >> + * contains an 'ovsrcu_node' as a field named MEMBER. The function > > Missing double space after period: "MEMBER. The function" should be > "MEMBER. The function". > Ack. >> + * is called following the next grace period. See 'Usage' above for an >> + * example. > > Should this comment also mention that the same ovsrcu_node must not be > scheduled multiple times? This restriction is not obvious and violating > it could cause subtle bugs (use-after-free when the callback executes > twice and frees the object twice). > Yes it's an important point, will add it. If we are ok with sparing a bit of memory, maybe we can maintain a flag within the node to assert no double-free? It sounds like a user error though, so not sure it's worth making all nodes bigger to detect it. >> + */ >> +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, >> + struct ovsrcu_node *node); >> +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ >> + (/* Verify that ARG is appropriate for FUNCTION. */ \ >> + (void) sizeof((FUNCTION)(ARG), 1), \ >> + /* Verify that ARG is a pointer type. */ \ >> + (void) sizeof(*(ARG)), \ >> + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ >> + &(ARG)->MEMBER)) >> + >> /* An array index protected by RCU semantics. This is an easier alternative to >> * an RCU protected pointer to a malloc'd int. */ >> typedef struct { atomic_int v; } ovsrcu_index; >> diff --git a/tests/test-rcu.c b/tests/test-rcu.c >> index bb17092bf..26150e7d9 100644 >> --- a/tests/test-rcu.c >> +++ b/tests/test-rcu.c >> @@ -17,11 +17,16 @@ >> #include <config.h> >> #undef NDEBUG >> #include "fatal-signal.h" >> +#include "ovs-atomic.h" >> #include "ovs-rcu.h" >> #include "ovs-thread.h" >> #include "ovstest.h" >> +#include "seq.h" >> +#include "timeval.h" >> #include "util.h" >> >> +#include "openvswitch/poll-loop.h" >> + >> static void * >> quiescer_main(void *aux OVS_UNUSED) >> { >> @@ -67,10 +72,136 @@ test_rcu_barrier(void) >> ovs_assert(count == 10); >> } >> >> +struct element { >> + struct ovsrcu_node rcu_node; >> + struct seq *trigger; >> + atomic_bool wait; >> +}; >> + >> +static void >> +trigger_cb(void *e_) >> +{ >> + struct element *e = (struct element *) e_; >> + >> + seq_change(e->trigger); > > Maybe add a counter in struct element to verify the callback > executes exactly once, catching potential double-execution bugs. > Ack. >> +} >> + >> +static void * >> +wait_main(void *aux) >> +{ >> + struct element *e = aux; >> + >> + for (;;) { >> + bool wait; >> + >> + atomic_read(&e->wait, &wait); >> + if (!wait) { >> + break; >> + } >> + } >> + >> + seq_wait(e->trigger, seq_read(e->trigger)); >> + poll_block(); >> + >> + return NULL; >> +} >> + >> +static void >> +test_rcu_postpone_embedded(bool multithread) >> +{ >> + long long int timeout; >> + pthread_t waiter; >> + struct element e; >> + uint64_t seqno; >> + >> + atomic_init(&e.wait, true); >> + >> + if (multithread) { >> + waiter = ovs_thread_create("waiter", wait_main, &e); >> + } >> + >> + e.trigger = seq_create(); >> + seqno = seq_read(e.trigger); >> + >> + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); >> + >> + /* Check that GC holds out until all threads are quiescent. */ >> + timeout = time_msec(); >> + if (multithread) { >> + timeout += 200; >> + } >> + while (time_msec() <= timeout) { >> + ovs_assert(seq_read(e.trigger) == seqno); >> + } >> + >> + atomic_store(&e.wait, false); >> + >> + seq_wait(e.trigger, seqno); >> + poll_timer_wait_until(time_msec() + 200); >> + poll_block(); >> + >> + /* Verify that GC executed. */ >> + ovs_assert(seq_read(e.trigger) != seqno); >> + seq_destroy(e.trigger); >> + >> + if (multithread) { >> + xpthread_join(waiter, NULL); >> + } >> +} >> + >> +#define N_ORDER_CBS 5 >> + >> +struct order_element { >> + struct ovsrcu_node rcu_node; >> + int id; >> + int *log; >> + int *log_idx; >> +}; >> + >> +static void >> +order_cb(void *aux) >> +{ >> + struct order_element *e = aux; >> + e->log[(*e->log_idx)++] = e->id; >> +} >> + >> +static void >> +test_rcu_ordering(void) >> +{ > > The documentation states "All functions postponed by a single thread are > guaranteed to execute in the order they were postponed", the test should > verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded() > calls. > Currently, ordering is only guaranteed within each 'scheduling types' (cbset or embedded). This is a simpler implementation. I can either 1. Modify the doc to specify this point. or 2. Modify the scheduling such that postponing into a cbset will trigger creating a new cbset if an embedded not is already at the tail of the thread-local node list: this way, each cbset executes in the right order, at the cost of potentially many cbset under-utilized (wasted of memory). I am assuming you will prefer (2.), if not let me know. >> + struct order_element elems[N_ORDER_CBS]; >> + int log[N_ORDER_CBS]; >> + int log_idx = 0; >> + >> + for (int i = 0; i < N_ORDER_CBS; i++) { >> + elems[i].id = i; >> + elems[i].log = log; >> + elems[i].log_idx = &log_idx; >> + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); >> + } >> + >> + ovsrcu_barrier(); >> + >> + ovs_assert(log_idx == N_ORDER_CBS); >> + for (int i = 0; i < N_ORDER_CBS; i++) { >> + if (log[i] != i) { >> + ovs_abort(0, "RCU embedded callback ordering violated: " >> + "expected cb %d at position %d, got %d", >> + i, i, log[i]); >> + } >> + } >> +} >> + >> static void >> test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { >> + const bool multithread = true; >> + >> + /* Execute single-threaded check before spawning additional threads. */ >> + test_rcu_postpone_embedded(!multithread); >> + test_rcu_postpone_embedded(multithread); >> + >> test_rcu_quiesce(); >> test_rcu_barrier(); >> + test_rcu_ordering(); > > test_rcu_ordering() should be tested in both single-threaded and > multi-threaded modes like test_rcu_postpone_embedded() to verify > ordering guarantees hold in both RCU execution paths. > Ack. >> } >> >> OVSTEST_REGISTER("test-rcu", test_rcu); >
On 14 Apr 2026, at 15:25, Gaetan Rivet wrote: > On 13/04/2026 16:21, Eelco Chaudron wrote: >> External email: Use caution opening links or attachments >> >> >> On 1 Apr 2026, at 11:13, Eli Britstein wrote: >> >>> From: Gaetan Rivet <gaetanr@nvidia.com> >>> >>> Add a way to schedule executions with the RCU using memory embedded >>> within the object being scheduled, if applicable. >>> >>> This way, freeing a high volume of objects does not require many small >>> allocations, potentially increasing heap fragmentation and memory >>> pressure. >> >> Thanks, Gaetan, for following up on this patch. This embedded version looks >> way nicer. I have a few comments below. >> >> Cheers, >> >> Eelco >> >>> Signed-off-by: Gaetan Rivet <gaetanr@nvidia.com> >>> Co-authored-by: Eli Britstein <elibr@nvidia.com> >>> Signed-off-by: Eli Britstein <elibr@nvidia.com> >>> --- >>> lib/guarded-list.c | 10 ++++ >>> lib/guarded-list.h | 2 + >>> lib/ovs-rcu.c | 110 ++++++++++++++++++++++--------------- >>> lib/ovs-rcu.h | 39 ++++++++++++++ >>> tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++ >>> 5 files changed, 249 insertions(+), 43 deletions(-) >>> >>> diff --git a/lib/guarded-list.c b/lib/guarded-list.c >>> index 2186d074e..bb77fb55f 100644 >>> --- a/lib/guarded-list.c >>> +++ b/lib/guarded-list.c >>> @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, >>> return retval; >>> } >>> >>> +void >>> +guarded_list_push_back_all(struct guarded_list *list, >>> + struct ovs_list *nodes, size_t n) >>> +{ >> >> guarded_list_push_back_all() trusts the 'n' parameter matches the actual >> list length, which could lead to incorrect counts if caller passes wrong >> value. I would prefer verifying which is probably as expensive as >> computing it internally. >> > > Ack, I think though it's a shame to have an unbounded list of nodes that > we push one by one under lock only to keep the guarded list under a max > count, while this max count is not otherwise used. Yes, I agree something like this would work, but adds overhead. { n = ovs_list_size(nodes); ovs_mutex_lock(&list->mutex); ovs_list_push_back_all(&list->list, nodes); list->n += n; ovs_mutex_unlock(&list->mutex); } > Would you be ok to change to an ovs_list + mutex, and keep the single op > move to the flushed nodes? I guess that would work, wish the guarded_list API had a non-count option. But found no clean way to add/do it. >>> + ovs_mutex_lock(&list->mutex); >>> + ovs_list_push_back_all(&list->list, nodes); >>> + list->n += n; >>> + ovs_mutex_unlock(&list->mutex); >>> +} >>> + >>> struct ovs_list * >>> guarded_list_pop_front(struct guarded_list *list) >>> { >>> diff --git a/lib/guarded-list.h b/lib/guarded-list.h >>> index 80ce22c12..b575dc425 100644 >>> --- a/lib/guarded-list.h >>> +++ b/lib/guarded-list.h >>> @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); >>> >>> size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, >>> size_t max); >>> +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, >>> + size_t n); >>> struct ovs_list *guarded_list_pop_front(struct guarded_list *); >>> size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); >>> >>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c >>> index 49afcc55c..54e6c469d 100644 >>> --- a/lib/ovs-rcu.c >>> +++ b/lib/ovs-rcu.c >>> @@ -38,7 +38,7 @@ struct ovsrcu_cb { >>> }; >>> >>> struct ovsrcu_cbset { >>> - struct ovs_list list_node; >>> + struct ovsrcu_node rcu_node; >>> struct ovsrcu_cb *cbs; >>> size_t n_allocated; >>> int n_cbs; >>> @@ -49,6 +49,8 @@ struct ovsrcu_perthread { >>> >>> uint64_t seqno; >>> struct ovsrcu_cbset *cbset; >>> + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ >>> + size_t n_pending; >> >> We might not need this, based on my earlier comment regarding >> guarded_list_push_back_all(). >> > > Ack. > >>> char name[16]; /* This thread's name. */ >>> }; >>> >>> @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; >>> static struct ovs_list ovsrcu_threads; >>> static struct ovs_mutex ovsrcu_threads_mutex; >>> >>> -static struct guarded_list flushed_cbsets; >>> -static struct seq *flushed_cbsets_seq; >>> +static struct guarded_list flushed_nodes; >>> +static struct seq *flushed_nodes_seq; >>> >>> static struct latch postpone_exit; >>> static struct ovs_barrier postpone_barrier; >>> >>> static void ovsrcu_init_module(void); >>> -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); >>> -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); >>> +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); >>> +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); >>> static void ovsrcu_unregister__(struct ovsrcu_perthread *); >>> static bool ovsrcu_call_postponed(void); >>> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); >>> @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) >>> perthread = xmalloc(sizeof *perthread); >>> perthread->seqno = seq_read(global_seqno); >>> perthread->cbset = NULL; >>> + ovs_list_init(&perthread->pending); >>> + perthread->n_pending = 0; >>> ovs_strlcpy(perthread->name, name[0] ? name : "main", >>> sizeof perthread->name); >>> >>> @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) >>> >>> perthread = ovsrcu_perthread_get(); >>> perthread->seqno = seq_read(global_seqno); >>> - if (perthread->cbset) { >>> - ovsrcu_flush_cbset(perthread); >>> - } >>> + ovsrcu_flush_nodes(perthread); >>> seq_change(global_seqno); >>> >>> ovsrcu_quiesced(); >>> @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) >>> perthread = ovsrcu_perthread_get(); >>> if (!seq_try_lock()) { >>> perthread->seqno = seq_read(global_seqno); >>> - if (perthread->cbset) { >>> - ovsrcu_flush_cbset__(perthread, true); >>> - } >>> + ovsrcu_flush_nodes__(perthread, true); >>> seq_change_protected(global_seqno); >>> seq_unlock(); >>> ovsrcu_quiesced(); >>> @@ -264,10 +264,10 @@ ovsrcu_exit(void) >>> /* Repeatedly: >>> * >>> * - Wait for a grace period. One important side effect is to push the >>> - * running thread's cbset into 'flushed_cbsets' so that the next call >>> + * running thread's nodes into 'flushed_nodes' so that the next call >>> * has something to call. >>> * >>> - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, >>> + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, >>> * we're done, otherwise the callbacks themselves might have requested >>> * more deferred callbacks so we go around again. >>> * >>> @@ -282,6 +282,32 @@ ovsrcu_exit(void) >>> } >>> } >>> >>> +static void >>> +ovsrcu_run_cbset(void *aux) >> >> Maybe the name should be more explicit, for example; >> ovsrcu_cbset_execute_and_free() >> >> You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream >> regression failures. >> > > Ack. > >>> +{ >>> + struct ovsrcu_cbset *cbset = aux; >>> + struct ovsrcu_cb *cb; >>> + >>> + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >>> + cb->function(cb->aux); >>> + } >>> + >> >> Should this be something like this, as &cbset->cbs[cbset->n_cbs] is >> undefined behaviour when cbset->cbs is NULL. >> >> ovs_assert(cbset->n_cbs <= cbset->n_allocated); >> >> if (cbset->cbs != NULL && cbset->n_cbs > 0) { >> for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >> cb->function(cb->aux); >> } >> } >> >> But I guess this would even be cleaner: >> >> ovs_assert(cbset->n_cbs <= cbset->n_allocated); >> >> for (size_t i = 0; i < cbset->n_cbs; i++) { >> cbset->cbs[i].function(cbset->cbs[i].aux); >> } >> > > Ack. > >>> + free(cbset->cbs); >>> + free(cbset); >>> +} >>> + >>> +void >>> +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, >>> + struct ovsrcu_node *rcu_node) >>> +{ >>> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); >>> + >>> + rcu_node->cb = function; >>> + rcu_node->aux = aux; >>> + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); >>> + perthread->n_pending++; >>> +} >>> + >>> /* Registers 'function' to be called, passing 'aux' as argument, after the >>> * next grace period. >>> * >>> @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) >>> cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); >>> cbset->n_allocated = MIN_CBS; >>> cbset->n_cbs = 0; >>> + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); >>> } >>> >>> if (cbset->n_cbs == cbset->n_allocated) { >>> @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) >>> static bool OVS_NO_SANITIZE_FUNCTION >>> ovsrcu_call_postponed(void) >>> { >>> - struct ovsrcu_cbset *cbset; >>> - struct ovs_list cbsets; >>> + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); >>> + struct ovsrcu_node *node; >>> >>> - guarded_list_pop_all(&flushed_cbsets, &cbsets); >>> - if (ovs_list_is_empty(&cbsets)) { >>> + guarded_list_pop_all(&flushed_nodes, &nodes); >>> + if (ovs_list_is_empty(&nodes)) { >>> return false; >>> } >>> >>> ovsrcu_synchronize(); >>> >>> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { >>> - struct ovsrcu_cb *cb; >>> - >>> - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >>> - cb->function(cb->aux); >>> - } >>> - free(cbset->cbs); >>> - free(cbset); >>> + LIST_FOR_EACH_POP (node, list_node, &nodes) { >>> + node->cb(node->aux); >>> } >>> >>> return true; >>> @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) >>> pthread_detach(pthread_self()); >>> >>> while (!latch_is_set(&postpone_exit)) { >>> - uint64_t seqno = seq_read(flushed_cbsets_seq); >>> + uint64_t cb_seqno = seq_read(flushed_nodes_seq); >>> if (!ovsrcu_call_postponed()) { >>> - seq_wait(flushed_cbsets_seq, seqno); >>> + seq_wait(flushed_nodes_seq, cb_seqno); >>> latch_wait(&postpone_exit); >>> poll_block(); >>> } >>> @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) >>> } >>> >>> static void >>> -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) >>> +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) >>> { >>> - struct ovsrcu_cbset *cbset = perthread->cbset; >>> + if (ovs_list_is_empty(&perthread->pending)) { >>> + return; >>> + } >>> >>> - if (cbset) { >>> - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); >>> - perthread->cbset = NULL; >>> + perthread->cbset = NULL; >>> + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, >>> + perthread->n_pending); >>> + ovs_list_init(&perthread->pending); >> >> Don't think there is a need to call init. ovs_list_push_back_all() calls >> ovs_list_splice() which leaves the source list empty, making this >> ovs_list_init() call redundant. >> > > Ack. > >>> + perthread->n_pending = 0; >>> >>> - if (protected) { >>> - seq_change_protected(flushed_cbsets_seq); >>> - } else { >>> - seq_change(flushed_cbsets_seq); >>> - } >>> + if (protected) { >>> + seq_change_protected(flushed_nodes_seq); >>> + } else { >>> + seq_change(flushed_nodes_seq); >>> } >>> } >>> >>> static void >>> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) >>> +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) >>> { >>> - ovsrcu_flush_cbset__(perthread, false); >>> + ovsrcu_flush_nodes__(perthread, false); >>> } >>> >>> static void >>> ovsrcu_unregister__(struct ovsrcu_perthread *perthread) >>> { >>> - if (perthread->cbset) { >>> - ovsrcu_flush_cbset(perthread); >>> + if (!ovs_list_is_empty(&perthread->pending)) { >>> + ovsrcu_flush_nodes(perthread); >>> } >>> >>> ovs_mutex_lock(&ovsrcu_threads_mutex); >>> @@ -438,8 +462,8 @@ ovsrcu_init_module(void) >>> ovs_list_init(&ovsrcu_threads); >>> ovs_mutex_init(&ovsrcu_threads_mutex); >>> >>> - guarded_list_init(&flushed_cbsets); >>> - flushed_cbsets_seq = seq_create(); >>> + guarded_list_init(&flushed_nodes); >>> + flushed_nodes_seq = seq_create(); >>> >>> ovsthread_once_done(&once); >>> } >>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h >>> index a1c15c126..efd43a1a2 100644 >>> --- a/lib/ovs-rcu.h >>> +++ b/lib/ovs-rcu.h >>> @@ -125,6 +125,22 @@ >>> * ovs_mutex_unlock(&mutex); >>> * } >>> * >>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can be >>> + * achieved using ovsrcu_postpone_embedded(): >>> + * >>> + * struct deferrable { >>> + * struct ovsrcu_node rcu_node; >>> + * }; >>> + * >>> + * void >>> + * deferred_free(struct deferrable *d) >>> + * { >>> + * ovsrcu_postpone_embedded(free, d, rcu_node); >>> + * } >>> + * >>> + * Using embedded fields can be preferred sometimes to avoid the small >>> + * allocations done in ovsrcu_postpone(). >>> + * >>> * In some rare cases an object may not be addressable with a pointer, but only >>> * through an array index (e.g. because it's provided by another library). It >>> * is still possible to have RCU semantics by using the ovsrcu_index type. >>> @@ -173,6 +189,8 @@ >>> #include "compiler.h" >>> #include "ovs-atomic.h" >>> >>> +#include "openvswitch/list.h" >>> + >>> #if __GNUC__ >>> #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } >>> #define OVSRCU_INITIALIZER(VALUE) { VALUE } >>> @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); >>> (void) sizeof(*(ARG)), \ >>> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) >>> >>> +struct ovsrcu_node { >>> + struct ovs_list list_node; >>> + void (*cb)(void *aux); >>> + void *aux; >>> +}; >>> + >>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which >>> + * contains an 'ovsrcu_node' as a field named MEMBER. The function >> >> Missing double space after period: "MEMBER. The function" should be >> "MEMBER. The function". >> > > Ack. > >>> + * is called following the next grace period. See 'Usage' above for an >>> + * example. >> >> Should this comment also mention that the same ovsrcu_node must not be >> scheduled multiple times? This restriction is not obvious and violating >> it could cause subtle bugs (use-after-free when the callback executes >> twice and frees the object twice). >> > > Yes it's an important point, will add it. > If we are ok with sparing a bit of memory, maybe we can maintain a flag > within the node to assert no double-free? > > It sounds like a user error though, so not sure it's worth making all > nodes bigger to detect it. Agree, we should try to avoid all these kinds of extra checks. If we ever need it in the future, we could add it under an RCU_DEBUG flag, similar to how the kernel handles debug assertions. >>> + */ >>> +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, >>> + struct ovsrcu_node *node); >>> +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ >>> + (/* Verify that ARG is appropriate for FUNCTION. */ \ >>> + (void) sizeof((FUNCTION)(ARG), 1), \ >>> + /* Verify that ARG is a pointer type. */ \ >>> + (void) sizeof(*(ARG)), \ >>> + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ >>> + &(ARG)->MEMBER)) >>> + >>> /* An array index protected by RCU semantics. This is an easier alternative to >>> * an RCU protected pointer to a malloc'd int. */ >>> typedef struct { atomic_int v; } ovsrcu_index; >>> diff --git a/tests/test-rcu.c b/tests/test-rcu.c >>> index bb17092bf..26150e7d9 100644 >>> --- a/tests/test-rcu.c >>> +++ b/tests/test-rcu.c >>> @@ -17,11 +17,16 @@ >>> #include <config.h> >>> #undef NDEBUG >>> #include "fatal-signal.h" >>> +#include "ovs-atomic.h" >>> #include "ovs-rcu.h" >>> #include "ovs-thread.h" >>> #include "ovstest.h" >>> +#include "seq.h" >>> +#include "timeval.h" >>> #include "util.h" >>> >>> +#include "openvswitch/poll-loop.h" >>> + >>> static void * >>> quiescer_main(void *aux OVS_UNUSED) >>> { >>> @@ -67,10 +72,136 @@ test_rcu_barrier(void) >>> ovs_assert(count == 10); >>> } >>> >>> +struct element { >>> + struct ovsrcu_node rcu_node; >>> + struct seq *trigger; >>> + atomic_bool wait; >>> +}; >>> + >>> +static void >>> +trigger_cb(void *e_) >>> +{ >>> + struct element *e = (struct element *) e_; >>> + >>> + seq_change(e->trigger); >> >> Maybe add a counter in struct element to verify the callback >> executes exactly once, catching potential double-execution bugs. >> > > Ack. > >>> +} >>> + >>> +static void * >>> +wait_main(void *aux) >>> +{ >>> + struct element *e = aux; >>> + >>> + for (;;) { >>> + bool wait; >>> + >>> + atomic_read(&e->wait, &wait); >>> + if (!wait) { >>> + break; >>> + } >>> + } >>> + >>> + seq_wait(e->trigger, seq_read(e->trigger)); >>> + poll_block(); >>> + >>> + return NULL; >>> +} >>> + >>> +static void >>> +test_rcu_postpone_embedded(bool multithread) >>> +{ >>> + long long int timeout; >>> + pthread_t waiter; >>> + struct element e; >>> + uint64_t seqno; >>> + >>> + atomic_init(&e.wait, true); >>> + >>> + if (multithread) { >>> + waiter = ovs_thread_create("waiter", wait_main, &e); >>> + } >>> + >>> + e.trigger = seq_create(); >>> + seqno = seq_read(e.trigger); >>> + >>> + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); >>> + >>> + /* Check that GC holds out until all threads are quiescent. */ >>> + timeout = time_msec(); >>> + if (multithread) { >>> + timeout += 200; >>> + } >>> + while (time_msec() <= timeout) { >>> + ovs_assert(seq_read(e.trigger) == seqno); >>> + } >>> + >>> + atomic_store(&e.wait, false); >>> + >>> + seq_wait(e.trigger, seqno); >>> + poll_timer_wait_until(time_msec() + 200); >>> + poll_block(); >>> + >>> + /* Verify that GC executed. */ >>> + ovs_assert(seq_read(e.trigger) != seqno); >>> + seq_destroy(e.trigger); >>> + >>> + if (multithread) { >>> + xpthread_join(waiter, NULL); >>> + } >>> +} >>> + >>> +#define N_ORDER_CBS 5 >>> + >>> +struct order_element { >>> + struct ovsrcu_node rcu_node; >>> + int id; >>> + int *log; >>> + int *log_idx; >>> +}; >>> + >>> +static void >>> +order_cb(void *aux) >>> +{ >>> + struct order_element *e = aux; >>> + e->log[(*e->log_idx)++] = e->id; >>> +} >>> + >>> +static void >>> +test_rcu_ordering(void) >>> +{ >> >> The documentation states "All functions postponed by a single thread are >> guaranteed to execute in the order they were postponed", the test should >> verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded() >> calls. >> > > Currently, ordering is only guaranteed within each 'scheduling types' > (cbset or embedded). > > This is a simpler implementation. I can either > > 1. Modify the doc to specify this point. > > or > > 2. Modify the scheduling such that postponing into a cbset will trigger > creating a new cbset if an embedded not is already at the tail of the > thread-local node list: this way, each cbset executes in the right > order, at the cost of potentially many cbset under-utilized (wasted of > memory). > > I am assuming you will prefer (2.), if not let me know. Since the embedded option is newly introduced, we could document that ordering is guaranteed within each callback type (embedded vs. non-embedded), but not between types. That is, all embedded callbacks execute in order, followed by all non-embedded callbacks in order. It feels like solution 2 is not worth the overhead. >>> + struct order_element elems[N_ORDER_CBS]; >>> + int log[N_ORDER_CBS]; >>> + int log_idx = 0; >>> + >>> + for (int i = 0; i < N_ORDER_CBS; i++) { >>> + elems[i].id = i; >>> + elems[i].log = log; >>> + elems[i].log_idx = &log_idx; >>> + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); >>> + } >>> + >>> + ovsrcu_barrier(); >>> + >>> + ovs_assert(log_idx == N_ORDER_CBS); >>> + for (int i = 0; i < N_ORDER_CBS; i++) { >>> + if (log[i] != i) { >>> + ovs_abort(0, "RCU embedded callback ordering violated: " >>> + "expected cb %d at position %d, got %d", >>> + i, i, log[i]); >>> + } >>> + } >>> +} >>> + >>> static void >>> test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { >>> + const bool multithread = true; >>> + >>> + /* Execute single-threaded check before spawning additional threads. */ >>> + test_rcu_postpone_embedded(!multithread); >>> + test_rcu_postpone_embedded(multithread); >>> + >>> test_rcu_quiesce(); >>> test_rcu_barrier(); >>> + test_rcu_ordering(); >> >> test_rcu_ordering() should be tested in both single-threaded and >> multi-threaded modes like test_rcu_postpone_embedded() to verify >> ordering guarantees hold in both RCU execution paths. >> > > Ack. > >>> } >>> >>> OVSTEST_REGISTER("test-rcu", test_rcu); >>
diff --git a/lib/guarded-list.c b/lib/guarded-list.c index 2186d074e..bb77fb55f 100644 --- a/lib/guarded-list.c +++ b/lib/guarded-list.c @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, return retval; } +void +guarded_list_push_back_all(struct guarded_list *list, + struct ovs_list *nodes, size_t n) +{ + ovs_mutex_lock(&list->mutex); + ovs_list_push_back_all(&list->list, nodes); + list->n += n; + ovs_mutex_unlock(&list->mutex); +} + struct ovs_list * guarded_list_pop_front(struct guarded_list *list) { diff --git a/lib/guarded-list.h b/lib/guarded-list.h index 80ce22c12..b575dc425 100644 --- a/lib/guarded-list.h +++ b/lib/guarded-list.h @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, size_t max); +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, + size_t n); struct ovs_list *guarded_list_pop_front(struct guarded_list *); size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c index 49afcc55c..54e6c469d 100644 --- a/lib/ovs-rcu.c +++ b/lib/ovs-rcu.c @@ -38,7 +38,7 @@ struct ovsrcu_cb { }; struct ovsrcu_cbset { - struct ovs_list list_node; + struct ovsrcu_node rcu_node; struct ovsrcu_cb *cbs; size_t n_allocated; int n_cbs; @@ -49,6 +49,8 @@ struct ovsrcu_perthread { uint64_t seqno; struct ovsrcu_cbset *cbset; + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ + size_t n_pending; char name[16]; /* This thread's name. */ }; @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; static struct ovs_list ovsrcu_threads; static struct ovs_mutex ovsrcu_threads_mutex; -static struct guarded_list flushed_cbsets; -static struct seq *flushed_cbsets_seq; +static struct guarded_list flushed_nodes; +static struct seq *flushed_nodes_seq; static struct latch postpone_exit; static struct ovs_barrier postpone_barrier; static void ovsrcu_init_module(void); -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); static void ovsrcu_unregister__(struct ovsrcu_perthread *); static bool ovsrcu_call_postponed(void); static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) perthread = xmalloc(sizeof *perthread); perthread->seqno = seq_read(global_seqno); perthread->cbset = NULL; + ovs_list_init(&perthread->pending); + perthread->n_pending = 0; ovs_strlcpy(perthread->name, name[0] ? name : "main", sizeof perthread->name); @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) perthread = ovsrcu_perthread_get(); perthread->seqno = seq_read(global_seqno); - if (perthread->cbset) { - ovsrcu_flush_cbset(perthread); - } + ovsrcu_flush_nodes(perthread); seq_change(global_seqno); ovsrcu_quiesced(); @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) perthread = ovsrcu_perthread_get(); if (!seq_try_lock()) { perthread->seqno = seq_read(global_seqno); - if (perthread->cbset) { - ovsrcu_flush_cbset__(perthread, true); - } + ovsrcu_flush_nodes__(perthread, true); seq_change_protected(global_seqno); seq_unlock(); ovsrcu_quiesced(); @@ -264,10 +264,10 @@ ovsrcu_exit(void) /* Repeatedly: * * - Wait for a grace period. One important side effect is to push the - * running thread's cbset into 'flushed_cbsets' so that the next call + * running thread's nodes into 'flushed_nodes' so that the next call * has something to call. * - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, * we're done, otherwise the callbacks themselves might have requested * more deferred callbacks so we go around again. * @@ -282,6 +282,32 @@ ovsrcu_exit(void) } } +static void +ovsrcu_run_cbset(void *aux) +{ + struct ovsrcu_cbset *cbset = aux; + struct ovsrcu_cb *cb; + + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { + cb->function(cb->aux); + } + + free(cbset->cbs); + free(cbset); +} + +void +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, + struct ovsrcu_node *rcu_node) +{ + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); + + rcu_node->cb = function; + rcu_node->aux = aux; + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); + perthread->n_pending++; +} + /* Registers 'function' to be called, passing 'aux' as argument, after the * next grace period. * @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); cbset->n_allocated = MIN_CBS; cbset->n_cbs = 0; + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); } if (cbset->n_cbs == cbset->n_allocated) { @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) static bool OVS_NO_SANITIZE_FUNCTION ovsrcu_call_postponed(void) { - struct ovsrcu_cbset *cbset; - struct ovs_list cbsets; + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); + struct ovsrcu_node *node; - guarded_list_pop_all(&flushed_cbsets, &cbsets); - if (ovs_list_is_empty(&cbsets)) { + guarded_list_pop_all(&flushed_nodes, &nodes); + if (ovs_list_is_empty(&nodes)) { return false; } ovsrcu_synchronize(); - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { - struct ovsrcu_cb *cb; - - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { - cb->function(cb->aux); - } - free(cbset->cbs); - free(cbset); + LIST_FOR_EACH_POP (node, list_node, &nodes) { + node->cb(node->aux); } return true; @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) pthread_detach(pthread_self()); while (!latch_is_set(&postpone_exit)) { - uint64_t seqno = seq_read(flushed_cbsets_seq); + uint64_t cb_seqno = seq_read(flushed_nodes_seq); if (!ovsrcu_call_postponed()) { - seq_wait(flushed_cbsets_seq, seqno); + seq_wait(flushed_nodes_seq, cb_seqno); latch_wait(&postpone_exit); poll_block(); } @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) } static void -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) { - struct ovsrcu_cbset *cbset = perthread->cbset; + if (ovs_list_is_empty(&perthread->pending)) { + return; + } - if (cbset) { - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); - perthread->cbset = NULL; + perthread->cbset = NULL; + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, + perthread->n_pending); + ovs_list_init(&perthread->pending); + perthread->n_pending = 0; - if (protected) { - seq_change_protected(flushed_cbsets_seq); - } else { - seq_change(flushed_cbsets_seq); - } + if (protected) { + seq_change_protected(flushed_nodes_seq); + } else { + seq_change(flushed_nodes_seq); } } static void -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) { - ovsrcu_flush_cbset__(perthread, false); + ovsrcu_flush_nodes__(perthread, false); } static void ovsrcu_unregister__(struct ovsrcu_perthread *perthread) { - if (perthread->cbset) { - ovsrcu_flush_cbset(perthread); + if (!ovs_list_is_empty(&perthread->pending)) { + ovsrcu_flush_nodes(perthread); } ovs_mutex_lock(&ovsrcu_threads_mutex); @@ -438,8 +462,8 @@ ovsrcu_init_module(void) ovs_list_init(&ovsrcu_threads); ovs_mutex_init(&ovsrcu_threads_mutex); - guarded_list_init(&flushed_cbsets); - flushed_cbsets_seq = seq_create(); + guarded_list_init(&flushed_nodes); + flushed_nodes_seq = seq_create(); ovsthread_once_done(&once); } diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h index a1c15c126..efd43a1a2 100644 --- a/lib/ovs-rcu.h +++ b/lib/ovs-rcu.h @@ -125,6 +125,22 @@ * ovs_mutex_unlock(&mutex); * } * + * As an alternative to ovsrcu_postpone(), the same deferred execution can be + * achieved using ovsrcu_postpone_embedded(): + * + * struct deferrable { + * struct ovsrcu_node rcu_node; + * }; + * + * void + * deferred_free(struct deferrable *d) + * { + * ovsrcu_postpone_embedded(free, d, rcu_node); + * } + * + * Using embedded fields can be preferred sometimes to avoid the small + * allocations done in ovsrcu_postpone(). + * * In some rare cases an object may not be addressable with a pointer, but only * through an array index (e.g. because it's provided by another library). It * is still possible to have RCU semantics by using the ovsrcu_index type. @@ -173,6 +189,8 @@ #include "compiler.h" #include "ovs-atomic.h" +#include "openvswitch/list.h" + #if __GNUC__ #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } #define OVSRCU_INITIALIZER(VALUE) { VALUE } @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); (void) sizeof(*(ARG)), \ ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) +struct ovsrcu_node { + struct ovs_list list_node; + void (*cb)(void *aux); + void *aux; +}; + +/* Calls FUNCTION passing ARG as its pointer-type argument, which + * contains an 'ovsrcu_node' as a field named MEMBER. The function + * is called following the next grace period. See 'Usage' above for an + * example. + */ +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, + struct ovsrcu_node *node); +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ + (/* Verify that ARG is appropriate for FUNCTION. */ \ + (void) sizeof((FUNCTION)(ARG), 1), \ + /* Verify that ARG is a pointer type. */ \ + (void) sizeof(*(ARG)), \ + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ + &(ARG)->MEMBER)) + /* An array index protected by RCU semantics. This is an easier alternative to * an RCU protected pointer to a malloc'd int. */ typedef struct { atomic_int v; } ovsrcu_index; diff --git a/tests/test-rcu.c b/tests/test-rcu.c index bb17092bf..26150e7d9 100644 --- a/tests/test-rcu.c +++ b/tests/test-rcu.c @@ -17,11 +17,16 @@ #include <config.h> #undef NDEBUG #include "fatal-signal.h" +#include "ovs-atomic.h" #include "ovs-rcu.h" #include "ovs-thread.h" #include "ovstest.h" +#include "seq.h" +#include "timeval.h" #include "util.h" +#include "openvswitch/poll-loop.h" + static void * quiescer_main(void *aux OVS_UNUSED) { @@ -67,10 +72,136 @@ test_rcu_barrier(void) ovs_assert(count == 10); } +struct element { + struct ovsrcu_node rcu_node; + struct seq *trigger; + atomic_bool wait; +}; + +static void +trigger_cb(void *e_) +{ + struct element *e = (struct element *) e_; + + seq_change(e->trigger); +} + +static void * +wait_main(void *aux) +{ + struct element *e = aux; + + for (;;) { + bool wait; + + atomic_read(&e->wait, &wait); + if (!wait) { + break; + } + } + + seq_wait(e->trigger, seq_read(e->trigger)); + poll_block(); + + return NULL; +} + +static void +test_rcu_postpone_embedded(bool multithread) +{ + long long int timeout; + pthread_t waiter; + struct element e; + uint64_t seqno; + + atomic_init(&e.wait, true); + + if (multithread) { + waiter = ovs_thread_create("waiter", wait_main, &e); + } + + e.trigger = seq_create(); + seqno = seq_read(e.trigger); + + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); + + /* Check that GC holds out until all threads are quiescent. */ + timeout = time_msec(); + if (multithread) { + timeout += 200; + } + while (time_msec() <= timeout) { + ovs_assert(seq_read(e.trigger) == seqno); + } + + atomic_store(&e.wait, false); + + seq_wait(e.trigger, seqno); + poll_timer_wait_until(time_msec() + 200); + poll_block(); + + /* Verify that GC executed. */ + ovs_assert(seq_read(e.trigger) != seqno); + seq_destroy(e.trigger); + + if (multithread) { + xpthread_join(waiter, NULL); + } +} + +#define N_ORDER_CBS 5 + +struct order_element { + struct ovsrcu_node rcu_node; + int id; + int *log; + int *log_idx; +}; + +static void +order_cb(void *aux) +{ + struct order_element *e = aux; + e->log[(*e->log_idx)++] = e->id; +} + +static void +test_rcu_ordering(void) +{ + struct order_element elems[N_ORDER_CBS]; + int log[N_ORDER_CBS]; + int log_idx = 0; + + for (int i = 0; i < N_ORDER_CBS; i++) { + elems[i].id = i; + elems[i].log = log; + elems[i].log_idx = &log_idx; + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); + } + + ovsrcu_barrier(); + + ovs_assert(log_idx == N_ORDER_CBS); + for (int i = 0; i < N_ORDER_CBS; i++) { + if (log[i] != i) { + ovs_abort(0, "RCU embedded callback ordering violated: " + "expected cb %d at position %d, got %d", + i, i, log[i]); + } + } +} + static void test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { + const bool multithread = true; + + /* Execute single-threaded check before spawning additional threads. */ + test_rcu_postpone_embedded(!multithread); + test_rcu_postpone_embedded(multithread); + test_rcu_quiesce(); test_rcu_barrier(); + test_rcu_ordering(); } OVSTEST_REGISTER("test-rcu", test_rcu);