diff mbox series

[ovs-dev,2/6] lib: Introduce cooperative multitasking module.

Message ID 20240110192939.15220-3-frode.nordahl@canonical.com
State Changes Requested
Delegated to: Ilya Maximets
Headers show
Series Introduce cooperative multitasking to improve OVSDB RAFT cluster operation. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/intel-ovs-compilation success test: success

Commit Message

Frode Nordahl Jan. 10, 2024, 7:29 p.m. UTC
One of the goals of Open vSwitch is to be as resource efficient as
possible.  Core parts of the program has been implemented as
asynchronous state machines, and when absolutely necessary
additional threads are used.

Introduce cooperative multitasking module which allow us to
interleave important processing with long running tasks while
avoiding the additional resource consumption of threads and
complexity of asynchronous state machines.

We will use this module to ensure long running processing in the
OVSDB server does not interfere with stable maintenance of the
RAFT cluster in subsequent patches.

Suggested-by: Ilya Maximets <i.maximets@ovn.org>
Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
---
 lib/automake.mk                        |   3 +
 lib/cooperative-multitasking-private.h |  31 +++
 lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
 lib/cooperative-multitasking.h         |  42 ++++
 tests/automake.mk                      |   1 +
 tests/library.at                       |  10 +
 tests/ovsdb-server.at                  |   1 +
 tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
 8 files changed, 542 insertions(+)
 create mode 100644 lib/cooperative-multitasking-private.h
 create mode 100644 lib/cooperative-multitasking.c
 create mode 100644 lib/cooperative-multitasking.h
 create mode 100644 tests/test-cooperative-multitasking.c

Comments

Ilya Maximets Jan. 11, 2024, 11:26 p.m. UTC | #1
On 1/10/24 20:29, Frode Nordahl wrote:
> One of the goals of Open vSwitch is to be as resource efficient as
> possible.  Core parts of the program has been implemented as
> asynchronous state machines, and when absolutely necessary
> additional threads are used.
> 
> Introduce cooperative multitasking module which allow us to
> interleave important processing with long running tasks while
> avoiding the additional resource consumption of threads and
> complexity of asynchronous state machines.
> 
> We will use this module to ensure long running processing in the
> OVSDB server does not interfere with stable maintenance of the
> RAFT cluster in subsequent patches.
> 
> Suggested-by: Ilya Maximets <i.maximets@ovn.org>
> Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
> ---
>  lib/automake.mk                        |   3 +
>  lib/cooperative-multitasking-private.h |  31 +++
>  lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
>  lib/cooperative-multitasking.h         |  42 ++++
>  tests/automake.mk                      |   1 +
>  tests/library.at                       |  10 +
>  tests/ovsdb-server.at                  |   1 +
>  tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
>  8 files changed, 542 insertions(+)
>  create mode 100644 lib/cooperative-multitasking-private.h
>  create mode 100644 lib/cooperative-multitasking.c
>  create mode 100644 lib/cooperative-multitasking.h
>  create mode 100644 tests/test-cooperative-multitasking.c

Thanks for working on this!  See some comments inline.

> 
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 0dc8a35cc..8596171c6 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
>  	lib/conntrack-other.c \
>  	lib/conntrack.c \
>  	lib/conntrack.h \
> +	lib/cooperative-multitasking.c \
> +	lib/cooperative-multitasking.h \
> +	lib/cooperative-multitasking-private.h \
>  	lib/coverage.c \
>  	lib/coverage.h \
>  	lib/cpu.c \
> diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
> new file mode 100644
> index 000000000..b2e4e7291
> --- /dev/null
> +++ b/lib/cooperative-multitasking-private.h
> @@ -0,0 +1,31 @@
> +/*
> + * Copyright (c) 2024 Canonical Ltd.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
> +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
> +
> +#include "openvswitch/hmap.h"
> +
> +struct cooperative_multitasking_callback {
> +    struct hmap_node node;
> +    void (*cb)(void *);
> +    void *arg;
> +    long long int time_threshold;
> +    long long int last_run;
> +    const char *msg;
> +};
> +
> +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
> diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
> new file mode 100644
> index 000000000..1b1205e8f
> --- /dev/null
> +++ b/lib/cooperative-multitasking.c
> @@ -0,0 +1,195 @@
> +/*
> + * Copyright (c) 2023 Canonical Ltd.

2024 ? :)

Same for other files.

> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "backtrace.h"
> +#include "cooperative-multitasking-private.h"
> +#include "cooperative-multitasking.h"
> +#include "hash.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/vlog.h"
> +#include "timeval.h"
> +
> +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
> +
> +static struct hmap *cooperative_multitasking_callbacks = NULL;

I wonder if we can use a shorter name.  You're using 'cm_callbacks'
in the test, for example.  It is an internal hash map after all.
Probably, no need to have an extra fancy name.

> +
> +/* One time initialization for process that wants to make use of cooperative
> + * multitasking module.  References to data is stored in 'hmap_container' and
> + * will be referenced by all calls to this module.  The ownership of the
> + * container itself remains with the caller while the data in the hmap is owned
> + * by this module and must be freed with a call to
> + * cooperative_multitasking_destroy().
> + *
> + * The purpose of having the caller own 'hmap_container' is:
> + * 1) Allow runtime decision whether to use cooperative multitasking without
> + *    having to pass data between loosely connected parts of a program.  This
> + *    is useful for the raft code which is consumed by both the ovsdb-server
> + *    daemon and the ovsdb-tool CLI utility.
> + * 2) Allow inspection of internal data by unit tests. */

Can we instead define the hash map here and export it as extern in the
internal header?  It looks a little strange to make a module to hold a
static hash map.  At the same time no two modules can call init.
It can also be statically initialized with HMAP_INITIALIZER(), so no
init function will be necessary.

> +void
> +cooperative_multitasking_init(struct hmap *hmap_container)
> +{
> +    cooperative_multitasking_callbacks = hmap_container;
> +    hmap_init(cooperative_multitasking_callbacks);
> +}
> +
> +/* Register callback 'cb' with argument 'arg' to be called when
> + * cooperating long running functions yield and 'time_threshold' msec has
> + * passed since the last call to the function.  If the optional 'msg' is not
> + * NULL it will be used when logging time threshold overrun conditions.
> + *
> + * It is possible to register the same callback multiple times as long as 'arg'
> + * is different for each registration.  It is up to the caller to ensure no
> + * unwanted duplicates are registered.
> + *
> + * The callback is expected to update the timestamp for last run with a call to
> + * cooperative_multitasking_update() using the same values for 'cb' and 'arg'.
> + */
> +void
> +cooperative_multitasking_register(void (*cb)(void *), void *arg,
> +                                  long long int time_threshold,
> +                                  const char *msg)
> +{
> +    if (!cooperative_multitasking_callbacks) {
> +        return;
> +    }
> +
> +    struct cooperative_multitasking_callback *cm_entry;

The structure is named as 'callback', but you're always using
'entry' as a variable name.  Maybe rename the structure?

> +
> +    cm_entry = xzalloc(sizeof *cm_entry);
> +    cm_entry->cb = cb;
> +    cm_entry->arg = arg;
> +    cm_entry->time_threshold = time_threshold;
> +    cm_entry->last_run = time_msec();
> +    cm_entry->msg = msg;
> +
> +    hmap_insert(cooperative_multitasking_callbacks,
> +                &cm_entry->node,
> +                hash_pointer(
> +                    cm_entry->arg ? cm_entry->arg : (void *) cm_entry->cb, 0));

Why hashing the arg instead of callback?  I suppose that makes
sense in context that we'll have only one user with a single
function and different arguments.  But it seems backwards from
the point of view of a library.

In practice, ovsdb-server usually serves only one clustered
database, so there should be no difference either way.
If it's 2 databases, should also not be a problem.  But hashing
the callback seems more logical.

What do you think?

> +}
> +
> +/* Free any data allocated by calls to cooperative_multitasking_register(). */
> +void
> +cooperative_multitasking_destroy(void)
> +{
> +    struct cooperative_multitasking_callback *cm_entry;
> +    HMAP_FOR_EACH_SAFE (cm_entry, node, cooperative_multitasking_callbacks) {
> +        hmap_remove(cooperative_multitasking_callbacks, &cm_entry->node);
> +        free(cm_entry);
> +    }
> +}
> +
> +/* Update data for already registered callback identified by 'cb' and 'arg'.
> + *
> + * The value for 'last_run' must at a minimal be updated each time the callback
> + * is run.  It can also be useful to update for multiple entry points to the
> + * part serviced by the callback to avoid unnecessary callbacks on next call to
> + * cooperative_multitasking_yield().

I'm not sure I understand this last sentence.

> + *
> + * Updating the value for 'time_threshold' may be necessary as a consequence of
> + * the change in runtime configuration or requirements of the serviced
> + * callback.
> + *
> + * Providing a value of 0 for 'last_run' or 'time_threshold' will result in
> + * the respective stored value left untouched. */
> +void
> +cooperative_multitasking_update(void (*cb)(void *), void *arg,
> +                                long long int last_run,
> +                                long long int time_threshold)
> +{
> +    if (!cooperative_multitasking_callbacks) {
> +        return;
> +    }
> +
> +    struct cooperative_multitasking_callback *cm_entry;
> +
> +    HMAP_FOR_EACH_WITH_HASH (cm_entry, node,
> +                             hash_pointer(arg ? arg : (void *) cb, 0),
> +                             cooperative_multitasking_callbacks)
> +    {
> +        if (cm_entry->cb == cb && cm_entry->arg == arg) {
> +            if (last_run) {
> +                cm_entry->last_run = last_run;
> +            }
> +
> +            if (time_threshold) {
> +                cm_entry->time_threshold = time_threshold;
> +            }
> +            return;

The register() function doesn't check for duplicates, so there
can be duplicates if used inaccurately and they will not be
updated.

Should we just have a single function like _set() that would
find and update or create if not found?  Instead of relying on
users to not add the same thing twice.

> +        }
> +    }
> +}
> +
> +static void
> +cooperative_multitasking_yield_at__(const char *source_location)
> +{
> +    long long int now = time_msec();
> +    struct cooperative_multitasking_callback *cm_entry;

Reverse x-mass tree.

> +
> +    HMAP_FOR_EACH (cm_entry, node, cooperative_multitasking_callbacks) {
> +        long long int elapsed = now - cm_entry->last_run;
> +
> +        if (elapsed >= cm_entry->time_threshold) {
> +            VLOG_DBG("yield called from %s: "
> +                     "%lld: %lld >= %lld, executing %p(%p)",
> +                     source_location, now, elapsed, cm_entry->time_threshold,
> +                     cm_entry->cb, cm_entry->arg);

Maybe an empty line here.
Also, the pointers are not very informative.  Should something
like 'name' be added to the structure?  So, it will be %s(%p).

Not sure why logging 'now'.  Maybe re-organize things a little:

"%{source}: yield for %{name}: elapsed(%lld) >= threshold(%lld), overrun: %lld"

What do you think?

> +            (*cm_entry->cb)(cm_entry->arg);
> +            if (elapsed - cm_entry->time_threshold >
> +                cm_entry->time_threshold / 8)

Callback updates thresholds, so the check should be done before
calling it.   Maybe 'time_' part of the structure filed is
redundant.  Without it the lines will be shorter.

> +            {

Should be on a previous line.

> +                VLOG_WARN("yield threshold overrun with %lld msec. %s",
> +                          elapsed - cm_entry->time_threshold,
> +                          cm_entry->msg ? cm_entry->msg : "");

If we add a name, we should print the callback name here as well.
And we may even drop the 'msg', because it will be more or less
clear what went wrong if we know that it is a raft_run, for example.

Also, we could combine two log calls into one by using base VLOG()
macro and choosing the log level conditionally, e.g.:

    bool warn = elapsed - entry->threshold > entry->threshold / 8;

    VLOG(warn ? VLL_WARN : VLL_DBG, ... );
    if (warn && VLOG_IS_DBG_ENABLED()) {
        backtrace
    }
    entry->cb();

> +                if (VLOG_IS_DBG_ENABLED()) {
> +                    /* log_backtrace() logs at ERROR level but we only want to
> +                     * log a backtrace when DEBUG is enabled */
> +                    log_backtrace();
> +                }
> +            }

Should we also update 'now'?  The callback itself might have taken
some time.

> +        }
> +    }

Maybe log a warning if overall yield run took more than, let's say, a second?

> +}
> +
> +/* Iterate over registered callbacks and execute callbacks as demanded by the
> + * recorded time threshold. */
> +void
> +cooperative_multitasking_yield_at(const char *source_location)
> +{
> +    static bool yield_in_progress = false;
> +
> +    if (!cooperative_multitasking_callbacks) {
> +        return;
> +    }
> +
> +    if (yield_in_progress) {
> +        VLOG_ERR_ONCE("nested yield avoided, this is a bug! "
> +                      "enable debug logging for more details.");

Might be better to capitalize the first letters in sentences.
The second one is bothering me :) , but if we capitalize the
second, then we also need to do the first.

> +        if (VLOG_IS_DBG_ENABLED()) {
> +            VLOG_DBG("nested yield, called from %s", source_location);
> +            log_backtrace();
> +        }
> +        return;
> +    }
> +    yield_in_progress = true;
> +
> +    cooperative_multitasking_yield_at__(source_location);
> +
> +    yield_in_progress = false;
> +}
> diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h
> new file mode 100644
> index 000000000..6286bfbf5
> --- /dev/null
> +++ b/lib/cooperative-multitasking.h
> @@ -0,0 +1,42 @@
> +/*
> + * Copyright (c) 2023 Canonical Ltd.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef COOPERATIVE_MULTITASKING_H
> +#define COOPERATIVE_MULTITASKING_H 1
> +

I think, we need a comment here describing the library and some use cases.
At least a little bit.  It should also have a thread safety section
clearly stating that the library is not thread safe can only be used from
one thread.

> +struct hmap;
> +
> +void cooperative_multitasking_init(struct hmap *);
> +
> +void cooperative_multitasking_register(void (*)(void *), void *,
> +                                       long long int, const char *);
> +#define COOPERATIVE_MULTITASKING_REGISTER(CB, ARG, TIME_THRESHOLD, MSG)       \
> +    cooperative_multitasking_register((void (*)(void *)) CB, (void *) ARG,    \
> +                                      TIME_THRESHOLD, MSG)
> +
> +void cooperative_multitasking_destroy(void);

We probably need an unregister/remove-style function as well.
For example, if we remove clustered database in runtime, we need
to make sure the callback will not be called for a destroyed
raft structure.

> +
> +void cooperative_multitasking_update(void (*)(void *), void *, long long int,
> +                                     long long int);
> +#define COOPERATIVE_MULTITASKING_UPDATE(CB, ARG, LAST_RUN, TIME_THRESHOLD)    \
> +    cooperative_multitasking_update((void (*) (void *)) CB, (void *) ARG,     \
> +                                    LAST_RUN, TIME_THRESHOLD)

I don't think we should have these macros.  I see that their purpose
is to hide pointer conversions, but unfortunately we can't really do that.
We need to make sure that all the callbacks actually have void (*) (void *))
prototypes and the arguments of these callbacks are actually void pointers.

The problem is that llvm devs decided that it is an undefined behavior
to call functions via pointer with a different type:
  http://reviews.llvm.org/D148827#4379764
And now UBSAN in clang-17 is complaining all over the place for multiple
projects:
  https://github.com/systemd/systemd/issues/29972
  https://github.com/openssl/openssl/issues/22896

This also affects OVS in rcu callbacks, but we didn't get around fixing
this yet, as release right now is much more important.
However, it'll be better to not introduce more issues like this.

Unfortunately we'll likely have to have a wrapper function like:

  void raft_run_wrap(void *arg)
  {
      struct raft *raft = (struct raft *) arg;

      raft_run(raft);
  }

And use that as a callback...  It is ugly and looks very unnecessary,
but llvm folks are not backing down from this and it looks like the only
option for generic libraries like RCU or coop-multitasking.

> +
> +void cooperative_multitasking_yield_at(const char *);
> +#define cooperative_multitasking_yield() \
> +    cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
> +
> +#endif /* COOPERATIVE_MULTITASKING_H */
> diff --git a/tests/automake.mk b/tests/automake.mk
> index 10c9fbb01..08c9b74d4 100644
> --- a/tests/automake.mk
> +++ b/tests/automake.mk
> @@ -456,6 +456,7 @@ tests_ovstest_SOURCES = \
>  	tests/test-ccmap.c \
>  	tests/test-cmap.c \
>  	tests/test-conntrack.c \
> +	tests/test-cooperative-multitasking.c \
>  	tests/test-csum.c \
>  	tests/test-flows.c \
>  	tests/test-hash.c \
> diff --git a/tests/library.at b/tests/library.at
> index 3f9df2f87..77d5abb01 100644
> --- a/tests/library.at
> +++ b/tests/library.at
> @@ -296,3 +296,13 @@ AT_CLEANUP
>  AT_SETUP([uuidset module])
>  AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
>  AT_CLEANUP
> +
> +AT_SETUP([cooperative-multitasking module])
> +AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
> +AT_CLEANUP
> +
> +AT_SETUP([cooperative-multitasking module nested yield detection])
> +AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
> +cooperative_multitasking|ERR|nested yield avoided, this is a bug! enable debug logging for more details.
> +])
> +AT_CLEANUP
> diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
> index 6eb758e22..88a9a9a27 100644
> --- a/tests/ovsdb-server.at
> +++ b/tests/ovsdb-server.at
> @@ -2387,6 +2387,7 @@ m4_define([CLEAN_LOG_FILE],
>    [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
>     sed '/|poll_loop|/d' |   dnl
>     sed '/|socket_util|/d' | dnl
> +   sed '/|cooperative_multitasking|DBG|/d' | dnl
>     sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
>  
>  CLEAN_LOG_FILE([1.log], [1.log.clear])
> diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c
> new file mode 100644
> index 000000000..ec6be865f
> --- /dev/null
> +++ b/tests/test-cooperative-multitasking.c
> @@ -0,0 +1,259 @@
> +/*
> + * Copyright (c) 2023 Canonical Ltd.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#undef NDEBUG
> +#include "cooperative-multitasking.h"
> +#include "cooperative-multitasking-private.h"
> +#include "openvswitch/hmap.h"
> +#include "ovstest.h"
> +#include "timeval.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +
> +static struct hmap cm_callbacks;
> +
> +struct fixture_arg {
> +    bool called;
> +};
> +
> +static void
> +fixture_run(struct fixture_arg *arg)

These will have to have a void * arguments...

> +{
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, arg, time_msec(), 0);
> +    if (arg) {
> +        arg->called = true;
> +    }
> +}
> +
> +static void
> +fixture_other_run(struct fixture_arg *arg)
> +{
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_other_run, arg, time_msec(), 0);
> +    if (arg) {
> +        arg->called = true;
> +    }
> +}
> +
> +static void
> +test_cm_register(void)
> +{
> +    struct cooperative_multitasking_callback *cm_entry;
> +    struct fixture_arg arg1 = {
> +        .called = false,
> +    };
> +    struct fixture_arg arg2 = {
> +        .called = false,
> +    };
> +
> +    timeval_stop();
> +    long long int now = time_msec();
> +
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_other_run, NULL, 3000, NULL);
> +
> +    ovs_assert(hmap_count(&cm_callbacks) == 3);
> +
> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> +        if (cm_entry->arg == (void *)&arg1) {
> +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
> +            ovs_assert (cm_entry->time_threshold == 1000);
> +            ovs_assert (cm_entry->last_run == now);
> +        } else if (cm_entry->arg == (void *)&arg2) {
> +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
> +            ovs_assert (cm_entry->time_threshold == 2000);
> +            ovs_assert (cm_entry->last_run == now);
> +        } else if (cm_entry->cb == (void (*)(void *)) &fixture_other_run) {
> +            ovs_assert (cm_entry->arg == NULL);
> +            ovs_assert (cm_entry->time_threshold == 3000);
> +            ovs_assert (cm_entry->last_run == now);

Nit: no space after assert, but a space between the type and a
variable in a cast.  Same for other similar parts of the test.

> +        } else {
> +            OVS_NOT_REACHED();
> +        }
> +    }
> +
> +    cooperative_multitasking_destroy();
> +}
> +
> +static void
> +test_cm_update(void)
> +{
> +    struct cooperative_multitasking_callback *cm_entry;
> +    struct fixture_arg arg1 = {
> +        .called = false,
> +    };
> +    struct fixture_arg arg2 = {
> +        .called = false,
> +    };
> +
> +    timeval_stop();
> +    long long int now = time_msec();
> +
> +    /* first register a couple of callbacks. */

Should start with a capital letter.  Same for all other comments in a file.

> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 0, NULL);
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 0, NULL);
> +
> +    ovs_assert(hmap_count(&cm_callbacks) == 2);
> +
> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> +        if (cm_entry->arg == (void *)&arg1) {
> +            ovs_assert (cm_entry->time_threshold == 0);
> +            ovs_assert (cm_entry->last_run == now);
> +        } else if (cm_entry->arg == (void *)&arg2) {
> +            ovs_assert (cm_entry->time_threshold == 0);
> +            ovs_assert (cm_entry->last_run == now);
> +        } else {
> +            OVS_NOT_REACHED();
> +        }
> +    }
> +
> +    /* update 'last_run' and 'time_threshold' for each callback and validate
> +     * that the correct entry was actually updated. */
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 1, 2);
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 3, 4);
> +
> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> +        if (cm_entry->arg == (void *)&arg1) {
> +            ovs_assert (cm_entry->time_threshold == 2);
> +            ovs_assert (cm_entry->last_run == 1);
> +        } else if (cm_entry->arg == (void *)&arg2) {
> +            ovs_assert (cm_entry->time_threshold == 4);
> +            ovs_assert (cm_entry->last_run == 3);
> +        } else {
> +            OVS_NOT_REACHED();
> +        }
> +    }
> +
> +    /* confirm that providing 0 for 'last_run' or 'time_threshold' leaves the
> +     * existing value untouched. */
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 0, 5);
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 6, 0);
> +
> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> +        if (cm_entry->arg == (void *)&arg1) {
> +            ovs_assert (cm_entry->time_threshold == 5);
> +            ovs_assert (cm_entry->last_run == 1);
> +        } else if (cm_entry->arg == (void *)&arg2) {
> +            ovs_assert (cm_entry->time_threshold == 4);
> +            ovs_assert (cm_entry->last_run == 6);
> +        } else {
> +            OVS_NOT_REACHED();
> +        }
> +    }
> +
> +    cooperative_multitasking_destroy();
> +}
> +
> +static void
> +test_cm_yield(void)
> +{
> +    struct cooperative_multitasking_callback *cm_entry;
> +    struct fixture_arg arg1 = {
> +        .called = false,
> +    };
> +    struct fixture_arg arg2 = {
> +        .called = false,
> +    };
> +
> +    timeval_stop();
> +    long long int now = time_msec();
> +
> +    /* first register a couple of callbacks. */
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
> +
> +    ovs_assert(hmap_count(&cm_callbacks) == 2);
> +
> +    /* call to yield should not execute callbacks until time threshold. */
> +    cooperative_multitasking_yield();
> +    ovs_assert(arg1.called == false);
> +    ovs_assert(arg2.called == false);
> +
> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> +        ovs_assert(cm_entry->last_run == now);
> +    }
> +
> +    /* move clock forward and confirm the expected callbacks to be executed. */
> +    timeval_warp(0, 1000);
> +    timeval_stop();
> +    cooperative_multitasking_yield();
> +    ovs_assert(arg1.called == true);
> +    ovs_assert(arg2.called == false);
> +
> +    /* move clock forward and confirm the expected callbacks to be executed. */
> +    arg1.called = arg2.called = false;
> +    timeval_warp(0, 1000);
> +    timeval_stop();
> +    cooperative_multitasking_yield();
> +    ovs_assert(arg1.called == true);
> +    ovs_assert(arg2.called == true);
> +
> +    timeval_warp(0, 1);
> +    cooperative_multitasking_destroy();
> +}
> +
> +static void
> +fixture_buggy_run(struct fixture_arg *arg)
> +{
> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_buggy_run, arg, time_msec(), 0);
> +    if (arg) {
> +        arg->called = true;
> +    }
> +    /* A real run function MUST NOT directly or indirectly call yield, this is
> +     * here to test the detection of such a programming error. */
> +    cooperative_multitasking_yield();
> +}
> +
> +static void
> +test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
> +{
> +    struct fixture_arg arg1 = {
> +        .called = false,
> +    };
> +
> +    set_program_name(argv[0]);
> +    vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
> +    vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
> +
> +    time_msec(); /* ensure timeval is initialized */

Capital letter and a period at the end.

> +    timeval_timewarp_enable();
> +
> +    cooperative_multitasking_init(&cm_callbacks);
> +
> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_buggy_run, &arg1, 1000, NULL);
> +    timeval_warp(0, 1000);
> +    cooperative_multitasking_yield();
> +    cooperative_multitasking_destroy();
> +}
> +
> +static void
> +test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
> +{
> +    time_msec(); /* ensure timeval is initialized */

Ditto.

> +    timeval_timewarp_enable();
> +
> +    cooperative_multitasking_init(&cm_callbacks);
> +
> +    test_cm_register();
> +    test_cm_update();
> +    test_cm_yield();
> +}
> +
> +OVSTEST_REGISTER("test-cooperative-multitasking",
> +                 test_cooperative_multitasking);
> +OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
> +                 test_cooperative_multitasking_nested_yield);
Frode Nordahl Jan. 12, 2024, 11:16 a.m. UTC | #2
On Fri, Jan 12, 2024 at 12:26 AM Ilya Maximets <i.maximets@ovn.org> wrote:
>
> On 1/10/24 20:29, Frode Nordahl wrote:
> > One of the goals of Open vSwitch is to be as resource efficient as
> > possible.  Core parts of the program has been implemented as
> > asynchronous state machines, and when absolutely necessary
> > additional threads are used.
> >
> > Introduce cooperative multitasking module which allow us to
> > interleave important processing with long running tasks while
> > avoiding the additional resource consumption of threads and
> > complexity of asynchronous state machines.
> >
> > We will use this module to ensure long running processing in the
> > OVSDB server does not interfere with stable maintenance of the
> > RAFT cluster in subsequent patches.
> >
> > Suggested-by: Ilya Maximets <i.maximets@ovn.org>
> > Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
> > ---
> >  lib/automake.mk                        |   3 +
> >  lib/cooperative-multitasking-private.h |  31 +++
> >  lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
> >  lib/cooperative-multitasking.h         |  42 ++++
> >  tests/automake.mk                      |   1 +
> >  tests/library.at                       |  10 +
> >  tests/ovsdb-server.at                  |   1 +
> >  tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
> >  8 files changed, 542 insertions(+)
> >  create mode 100644 lib/cooperative-multitasking-private.h
> >  create mode 100644 lib/cooperative-multitasking.c
> >  create mode 100644 lib/cooperative-multitasking.h
> >  create mode 100644 tests/test-cooperative-multitasking.c
>
> Thanks for working on this!  See some comments inline.

Thank you for providing the counter proposal for this approach, I
think it will turn out much better than the original approach.

> >
> > diff --git a/lib/automake.mk b/lib/automake.mk
> > index 0dc8a35cc..8596171c6 100644
> > --- a/lib/automake.mk
> > +++ b/lib/automake.mk
> > @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
> >       lib/conntrack-other.c \
> >       lib/conntrack.c \
> >       lib/conntrack.h \
> > +     lib/cooperative-multitasking.c \
> > +     lib/cooperative-multitasking.h \
> > +     lib/cooperative-multitasking-private.h \
> >       lib/coverage.c \
> >       lib/coverage.h \
> >       lib/cpu.c \
> > diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
> > new file mode 100644
> > index 000000000..b2e4e7291
> > --- /dev/null
> > +++ b/lib/cooperative-multitasking-private.h
> > @@ -0,0 +1,31 @@
> > +/*
> > + * Copyright (c) 2024 Canonical Ltd.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
> > +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
> > +
> > +#include "openvswitch/hmap.h"
> > +
> > +struct cooperative_multitasking_callback {
> > +    struct hmap_node node;
> > +    void (*cb)(void *);
> > +    void *arg;
> > +    long long int time_threshold;
> > +    long long int last_run;
> > +    const char *msg;
> > +};
> > +
> > +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
> > diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
> > new file mode 100644
> > index 000000000..1b1205e8f
> > --- /dev/null
> > +++ b/lib/cooperative-multitasking.c
> > @@ -0,0 +1,195 @@
> > +/*
> > + * Copyright (c) 2023 Canonical Ltd.
>
> 2024 ? :)
>
> Same for other files.

I started the development before the break, so the files were actually
created in 2023, and we are now indeed in 2024 so it makes sense to
update :)

> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +
> > +#include "backtrace.h"
> > +#include "cooperative-multitasking-private.h"
> > +#include "cooperative-multitasking.h"
> > +#include "hash.h"
> > +#include "openvswitch/hmap.h"
> > +#include "openvswitch/vlog.h"
> > +#include "timeval.h"
> > +
> > +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
> > +
> > +static struct hmap *cooperative_multitasking_callbacks = NULL;
>
> I wonder if we can use a shorter name.  You're using 'cm_callbacks'
> in the test, for example.  It is an internal hash map after all.
> Probably, no need to have an extra fancy name.

I was mostly concerned with consistent namespacing since it is a
"global" variable, as-is it is contained to the module as you point
out.

However, if we're going to share it with the unit test through extern
as proposed below I suggest we keep the long name to avoid potential
symbol clashes?

> > +
> > +/* One time initialization for process that wants to make use of cooperative
> > + * multitasking module.  References to data is stored in 'hmap_container' and
> > + * will be referenced by all calls to this module.  The ownership of the
> > + * container itself remains with the caller while the data in the hmap is owned
> > + * by this module and must be freed with a call to
> > + * cooperative_multitasking_destroy().
> > + *
> > + * The purpose of having the caller own 'hmap_container' is:
> > + * 1) Allow runtime decision whether to use cooperative multitasking without
> > + *    having to pass data between loosely connected parts of a program.  This
> > + *    is useful for the raft code which is consumed by both the ovsdb-server
> > + *    daemon and the ovsdb-tool CLI utility.
> > + * 2) Allow inspection of internal data by unit tests. */
>
> Can we instead define the hash map here and export it as extern in the
> internal header?  It looks a little strange to make a module to hold a
> static hash map.  At the same time no two modules can call init.
> It can also be statically initialized with HMAP_INITIALIZER(), so no
> init function will be necessary.

I was on that path initially, but at the time I did not have the
private header so there was no comfortable way to share it, and I
somehow convinced myself that we needed to conditionally enable this
for the dual use of raft code from ovsdb-server / ovsdb-tool, but
there is no such need.

Will drop _init() and initialize the container in the library instead
as suggested.

> > +void
> > +cooperative_multitasking_init(struct hmap *hmap_container)
> > +{
> > +    cooperative_multitasking_callbacks = hmap_container;
> > +    hmap_init(cooperative_multitasking_callbacks);
> > +}
> > +
> > +/* Register callback 'cb' with argument 'arg' to be called when
> > + * cooperating long running functions yield and 'time_threshold' msec has
> > + * passed since the last call to the function.  If the optional 'msg' is not
> > + * NULL it will be used when logging time threshold overrun conditions.
> > + *
> > + * It is possible to register the same callback multiple times as long as 'arg'
> > + * is different for each registration.  It is up to the caller to ensure no
> > + * unwanted duplicates are registered.
> > + *
> > + * The callback is expected to update the timestamp for last run with a call to
> > + * cooperative_multitasking_update() using the same values for 'cb' and 'arg'.
> > + */
> > +void
> > +cooperative_multitasking_register(void (*cb)(void *), void *arg,
> > +                                  long long int time_threshold,
> > +                                  const char *msg)
> > +{
> > +    if (!cooperative_multitasking_callbacks) {
> > +        return;
> > +    }
> > +
> > +    struct cooperative_multitasking_callback *cm_entry;
>
> The structure is named as 'callback', but you're always using
> 'entry' as a variable name.  Maybe rename the structure?

Ack, I guess we could even shorten it to 'cm_entry' as it's only used
internally.

> > +
> > +    cm_entry = xzalloc(sizeof *cm_entry);
> > +    cm_entry->cb = cb;
> > +    cm_entry->arg = arg;
> > +    cm_entry->time_threshold = time_threshold;
> > +    cm_entry->last_run = time_msec();
> > +    cm_entry->msg = msg;
> > +
> > +    hmap_insert(cooperative_multitasking_callbacks,
> > +                &cm_entry->node,
> > +                hash_pointer(
> > +                    cm_entry->arg ? cm_entry->arg : (void *) cm_entry->cb, 0));
>
> Why hashing the arg instead of callback?  I suppose that makes
> sense in context that we'll have only one user with a single
> function and different arguments.  But it seems backwards from
> the point of view of a library.
>
> In practice, ovsdb-server usually serves only one clustered
> database, so there should be no difference either way.
> If it's 2 databases, should also not be a problem.  But hashing
> the callback seems more logical.
>
> What do you think?

Ack, I guess you caught me in premature/unneeded optimization, it's
not like there will be thousands of entries for the same value of 'cb'
with different 'arg'.

>
> > +}
> > +
> > +/* Free any data allocated by calls to cooperative_multitasking_register(). */
> > +void
> > +cooperative_multitasking_destroy(void)
> > +{
> > +    struct cooperative_multitasking_callback *cm_entry;
> > +    HMAP_FOR_EACH_SAFE (cm_entry, node, cooperative_multitasking_callbacks) {
> > +        hmap_remove(cooperative_multitasking_callbacks, &cm_entry->node);
> > +        free(cm_entry);
> > +    }
> > +}
> > +
> > +/* Update data for already registered callback identified by 'cb' and 'arg'.
> > + *
> > + * The value for 'last_run' must at a minimal be updated each time the callback
> > + * is run.  It can also be useful to update for multiple entry points to the
> > + * part serviced by the callback to avoid unnecessary callbacks on next call to
> > + * cooperative_multitasking_yield().
>
> I'm not sure I understand this last sentence.

It's indeed a bit convoluted, so I'll reword it.

> > + *
> > + * Updating the value for 'time_threshold' may be necessary as a consequence of
> > + * the change in runtime configuration or requirements of the serviced
> > + * callback.
> > + *
> > + * Providing a value of 0 for 'last_run' or 'time_threshold' will result in
> > + * the respective stored value left untouched. */
> > +void
> > +cooperative_multitasking_update(void (*cb)(void *), void *arg,
> > +                                long long int last_run,
> > +                                long long int time_threshold)
> > +{
> > +    if (!cooperative_multitasking_callbacks) {
> > +        return;
> > +    }
> > +
> > +    struct cooperative_multitasking_callback *cm_entry;
> > +
> > +    HMAP_FOR_EACH_WITH_HASH (cm_entry, node,
> > +                             hash_pointer(arg ? arg : (void *) cb, 0),
> > +                             cooperative_multitasking_callbacks)
> > +    {
> > +        if (cm_entry->cb == cb && cm_entry->arg == arg) {
> > +            if (last_run) {
> > +                cm_entry->last_run = last_run;
> > +            }
> > +
> > +            if (time_threshold) {
> > +                cm_entry->time_threshold = time_threshold;
> > +            }
> > +            return;
>
> The register() function doesn't check for duplicates, so there
> can be duplicates if used inaccurately and they will not be
> updated.
>
> Should we just have a single function like _set() that would
> find and update or create if not found?  Instead of relying on
> users to not add the same thing twice.

Yes, that makes sense. The fact that no duplicate check is done was
documented, but condensing this to a single _set() will most likely
make that behavior even more clear to the caller.

> > +        }
> > +    }
> > +}
> > +
> > +static void
> > +cooperative_multitasking_yield_at__(const char *source_location)
> > +{
> > +    long long int now = time_msec();
> > +    struct cooperative_multitasking_callback *cm_entry;
>
> Reverse x-mass tree.

Ack.

> > +
> > +    HMAP_FOR_EACH (cm_entry, node, cooperative_multitasking_callbacks) {
> > +        long long int elapsed = now - cm_entry->last_run;
> > +
> > +        if (elapsed >= cm_entry->time_threshold) {
> > +            VLOG_DBG("yield called from %s: "
> > +                     "%lld: %lld >= %lld, executing %p(%p)",
> > +                     source_location, now, elapsed, cm_entry->time_threshold,
> > +                     cm_entry->cb, cm_entry->arg);
>
> Maybe an empty line here.
> Also, the pointers are not very informative.  Should something
> like 'name' be added to the structure?  So, it will be %s(%p).
>
> Not sure why logging 'now'.  Maybe re-organize things a little:
>
> "%{source}: yield for %{name}: elapsed(%lld) >= threshold(%lld), overrun: %lld"
>
> What do you think?
>
> > +            (*cm_entry->cb)(cm_entry->arg);
> > +            if (elapsed - cm_entry->time_threshold >
> > +                cm_entry->time_threshold / 8)
>
> Callback updates thresholds, so the check should be done before
> calling it.   Maybe 'time_' part of the structure filed is
> redundant.  Without it the lines will be shorter.

Thank you for pointing out that the callback can update the threshold,
that fact eluded me in some iteration.

My original thought for splitting this across two calls was to avoid
delaying the callback even further by spending time on producing the
backtrace. But I guess it's not that expensive and it is only
performed during debugging.  Doing it before also avoids polluting the
backtrace with the call to the callback.

> > +            {
>
> Should be on a previous line.
>
> > +                VLOG_WARN("yield threshold overrun with %lld msec. %s",
> > +                          elapsed - cm_entry->time_threshold,
> > +                          cm_entry->msg ? cm_entry->msg : "");
>
> If we add a name, we should print the callback name here as well.
> And we may even drop the 'msg', because it will be more or less
> clear what went wrong if we know that it is a raft_run, for example.
>
> Also, we could combine two log calls into one by using base VLOG()
> macro and choosing the log level conditionally, e.g.:
>
>     bool warn = elapsed - entry->threshold > entry->threshold / 8;
>
>     VLOG(warn ? VLL_WARN : VLL_DBG, ... );
>     if (warn && VLOG_IS_DBG_ENABLED()) {
>         backtrace
>     }
>     entry->cb();

Yes, that is indeed much nicer, thx!

> > +                if (VLOG_IS_DBG_ENABLED()) {
> > +                    /* log_backtrace() logs at ERROR level but we only want to
> > +                     * log a backtrace when DEBUG is enabled */
> > +                    log_backtrace();
> > +                }
> > +            }
>
> Should we also update 'now'?  The callback itself might have taken
> some time.

Yes, good point.

> > +        }
> > +    }
>
> Maybe log a warning if overall yield run took more than, let's say, a second?

Ack.

> > +}
> > +
> > +/* Iterate over registered callbacks and execute callbacks as demanded by the
> > + * recorded time threshold. */
> > +void
> > +cooperative_multitasking_yield_at(const char *source_location)
> > +{
> > +    static bool yield_in_progress = false;
> > +
> > +    if (!cooperative_multitasking_callbacks) {
> > +        return;
> > +    }
> > +
> > +    if (yield_in_progress) {
> > +        VLOG_ERR_ONCE("nested yield avoided, this is a bug! "
> > +                      "enable debug logging for more details.");
>
> Might be better to capitalize the first letters in sentences.
> The second one is bothering me :) , but if we capitalize the
> second, then we also need to do the first.

Ack.

> > +        if (VLOG_IS_DBG_ENABLED()) {
> > +            VLOG_DBG("nested yield, called from %s", source_location);
> > +            log_backtrace();
> > +        }
> > +        return;
> > +    }
> > +    yield_in_progress = true;
> > +
> > +    cooperative_multitasking_yield_at__(source_location);
> > +
> > +    yield_in_progress = false;
> > +}
> > diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h
> > new file mode 100644
> > index 000000000..6286bfbf5
> > --- /dev/null
> > +++ b/lib/cooperative-multitasking.h
> > @@ -0,0 +1,42 @@
> > +/*
> > + * Copyright (c) 2023 Canonical Ltd.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#ifndef COOPERATIVE_MULTITASKING_H
> > +#define COOPERATIVE_MULTITASKING_H 1
> > +
>
> I think, we need a comment here describing the library and some use cases.
> At least a little bit.  It should also have a thread safety section
> clearly stating that the library is not thread safe can only be used from
> one thread.

Ack.

> > +struct hmap;
> > +
> > +void cooperative_multitasking_init(struct hmap *);
> > +
> > +void cooperative_multitasking_register(void (*)(void *), void *,
> > +                                       long long int, const char *);
> > +#define COOPERATIVE_MULTITASKING_REGISTER(CB, ARG, TIME_THRESHOLD, MSG)       \
> > +    cooperative_multitasking_register((void (*)(void *)) CB, (void *) ARG,    \
> > +                                      TIME_THRESHOLD, MSG)
> > +
> > +void cooperative_multitasking_destroy(void);
>
> We probably need an unregister/remove-style function as well.
> For example, if we remove clustered database in runtime, we need
> to make sure the callback will not be called for a destroyed
> raft structure.

Yes, that is a good point. I'll add that.

> > +
> > +void cooperative_multitasking_update(void (*)(void *), void *, long long int,
> > +                                     long long int);
> > +#define COOPERATIVE_MULTITASKING_UPDATE(CB, ARG, LAST_RUN, TIME_THRESHOLD)    \
> > +    cooperative_multitasking_update((void (*) (void *)) CB, (void *) ARG,     \
> > +                                    LAST_RUN, TIME_THRESHOLD)
>
> I don't think we should have these macros.  I see that their purpose
> is to hide pointer conversions, but unfortunately we can't really do that.
> We need to make sure that all the callbacks actually have void (*) (void *))
> prototypes and the arguments of these callbacks are actually void pointers.
>
> The problem is that llvm devs decided that it is an undefined behavior
> to call functions via pointer with a different type:
>   http://reviews.llvm.org/D148827#4379764
> And now UBSAN in clang-17 is complaining all over the place for multiple
> projects:
>   https://github.com/systemd/systemd/issues/29972
>   https://github.com/openssl/openssl/issues/22896
>
> This also affects OVS in rcu callbacks, but we didn't get around fixing
> this yet, as release right now is much more important.
> However, it'll be better to not introduce more issues like this.
>
> Unfortunately we'll likely have to have a wrapper function like:
>
>   void raft_run_wrap(void *arg)
>   {
>       struct raft *raft = (struct raft *) arg;
>
>       raft_run(raft);
>   }
>
> And use that as a callback...  It is ugly and looks very unnecessary,
> but llvm folks are not backing down from this and it looks like the only
> option for generic libraries like RCU or coop-multitasking.

Thank you for elaborating on this, I was unaware of this looming
compiler change on the horizon.

> > +
> > +void cooperative_multitasking_yield_at(const char *);
> > +#define cooperative_multitasking_yield() \
> > +    cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
> > +
> > +#endif /* COOPERATIVE_MULTITASKING_H */
> > diff --git a/tests/automake.mk b/tests/automake.mk
> > index 10c9fbb01..08c9b74d4 100644
> > --- a/tests/automake.mk
> > +++ b/tests/automake.mk
> > @@ -456,6 +456,7 @@ tests_ovstest_SOURCES = \
> >       tests/test-ccmap.c \
> >       tests/test-cmap.c \
> >       tests/test-conntrack.c \
> > +     tests/test-cooperative-multitasking.c \
> >       tests/test-csum.c \
> >       tests/test-flows.c \
> >       tests/test-hash.c \
> > diff --git a/tests/library.at b/tests/library.at
> > index 3f9df2f87..77d5abb01 100644
> > --- a/tests/library.at
> > +++ b/tests/library.at
> > @@ -296,3 +296,13 @@ AT_CLEANUP
> >  AT_SETUP([uuidset module])
> >  AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
> >  AT_CLEANUP
> > +
> > +AT_SETUP([cooperative-multitasking module])
> > +AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
> > +AT_CLEANUP
> > +
> > +AT_SETUP([cooperative-multitasking module nested yield detection])
> > +AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
> > +cooperative_multitasking|ERR|nested yield avoided, this is a bug! enable debug logging for more details.
> > +])
> > +AT_CLEANUP
> > diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
> > index 6eb758e22..88a9a9a27 100644
> > --- a/tests/ovsdb-server.at
> > +++ b/tests/ovsdb-server.at
> > @@ -2387,6 +2387,7 @@ m4_define([CLEAN_LOG_FILE],
> >    [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
> >     sed '/|poll_loop|/d' |   dnl
> >     sed '/|socket_util|/d' | dnl
> > +   sed '/|cooperative_multitasking|DBG|/d' | dnl
> >     sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
> >
> >  CLEAN_LOG_FILE([1.log], [1.log.clear])
> > diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c
> > new file mode 100644
> > index 000000000..ec6be865f
> > --- /dev/null
> > +++ b/tests/test-cooperative-multitasking.c
> > @@ -0,0 +1,259 @@
> > +/*
> > + * Copyright (c) 2023 Canonical Ltd.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +#undef NDEBUG
> > +#include "cooperative-multitasking.h"
> > +#include "cooperative-multitasking-private.h"
> > +#include "openvswitch/hmap.h"
> > +#include "ovstest.h"
> > +#include "timeval.h"
> > +#include "util.h"
> > +#include "openvswitch/vlog.h"
> > +
> > +static struct hmap cm_callbacks;
> > +
> > +struct fixture_arg {
> > +    bool called;
> > +};
> > +
> > +static void
> > +fixture_run(struct fixture_arg *arg)
>
> These will have to have a void * arguments...

I'll add the wrap functions to the test as well.

> > +{
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, arg, time_msec(), 0);
> > +    if (arg) {
> > +        arg->called = true;
> > +    }
> > +}
> > +
> > +static void
> > +fixture_other_run(struct fixture_arg *arg)
> > +{
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_other_run, arg, time_msec(), 0);
> > +    if (arg) {
> > +        arg->called = true;
> > +    }
> > +}
> > +
> > +static void
> > +test_cm_register(void)
> > +{
> > +    struct cooperative_multitasking_callback *cm_entry;
> > +    struct fixture_arg arg1 = {
> > +        .called = false,
> > +    };
> > +    struct fixture_arg arg2 = {
> > +        .called = false,
> > +    };
> > +
> > +    timeval_stop();
> > +    long long int now = time_msec();
> > +
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_other_run, NULL, 3000, NULL);
> > +
> > +    ovs_assert(hmap_count(&cm_callbacks) == 3);
> > +
> > +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> > +        if (cm_entry->arg == (void *)&arg1) {
> > +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
> > +            ovs_assert (cm_entry->time_threshold == 1000);
> > +            ovs_assert (cm_entry->last_run == now);
> > +        } else if (cm_entry->arg == (void *)&arg2) {
> > +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
> > +            ovs_assert (cm_entry->time_threshold == 2000);
> > +            ovs_assert (cm_entry->last_run == now);
> > +        } else if (cm_entry->cb == (void (*)(void *)) &fixture_other_run) {
> > +            ovs_assert (cm_entry->arg == NULL);
> > +            ovs_assert (cm_entry->time_threshold == 3000);
> > +            ovs_assert (cm_entry->last_run == now);
>
> Nit: no space after assert, but a space between the type and a
> variable in a cast.  Same for other similar parts of the test.

Ack.

> > +        } else {
> > +            OVS_NOT_REACHED();
> > +        }
> > +    }
> > +
> > +    cooperative_multitasking_destroy();
> > +}
> > +
> > +static void
> > +test_cm_update(void)
> > +{
> > +    struct cooperative_multitasking_callback *cm_entry;
> > +    struct fixture_arg arg1 = {
> > +        .called = false,
> > +    };
> > +    struct fixture_arg arg2 = {
> > +        .called = false,
> > +    };
> > +
> > +    timeval_stop();
> > +    long long int now = time_msec();
> > +
> > +    /* first register a couple of callbacks. */
>
> Should start with a capital letter.  Same for all other comments in a file.

Ack.

> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 0, NULL);
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 0, NULL);
> > +
> > +    ovs_assert(hmap_count(&cm_callbacks) == 2);
> > +
> > +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> > +        if (cm_entry->arg == (void *)&arg1) {
> > +            ovs_assert (cm_entry->time_threshold == 0);
> > +            ovs_assert (cm_entry->last_run == now);
> > +        } else if (cm_entry->arg == (void *)&arg2) {
> > +            ovs_assert (cm_entry->time_threshold == 0);
> > +            ovs_assert (cm_entry->last_run == now);
> > +        } else {
> > +            OVS_NOT_REACHED();
> > +        }
> > +    }
> > +
> > +    /* update 'last_run' and 'time_threshold' for each callback and validate
> > +     * that the correct entry was actually updated. */
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 1, 2);
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 3, 4);
> > +
> > +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> > +        if (cm_entry->arg == (void *)&arg1) {
> > +            ovs_assert (cm_entry->time_threshold == 2);
> > +            ovs_assert (cm_entry->last_run == 1);
> > +        } else if (cm_entry->arg == (void *)&arg2) {
> > +            ovs_assert (cm_entry->time_threshold == 4);
> > +            ovs_assert (cm_entry->last_run == 3);
> > +        } else {
> > +            OVS_NOT_REACHED();
> > +        }
> > +    }
> > +
> > +    /* confirm that providing 0 for 'last_run' or 'time_threshold' leaves the
> > +     * existing value untouched. */
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 0, 5);
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 6, 0);
> > +
> > +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> > +        if (cm_entry->arg == (void *)&arg1) {
> > +            ovs_assert (cm_entry->time_threshold == 5);
> > +            ovs_assert (cm_entry->last_run == 1);
> > +        } else if (cm_entry->arg == (void *)&arg2) {
> > +            ovs_assert (cm_entry->time_threshold == 4);
> > +            ovs_assert (cm_entry->last_run == 6);
> > +        } else {
> > +            OVS_NOT_REACHED();
> > +        }
> > +    }
> > +
> > +    cooperative_multitasking_destroy();
> > +}
> > +
> > +static void
> > +test_cm_yield(void)
> > +{
> > +    struct cooperative_multitasking_callback *cm_entry;
> > +    struct fixture_arg arg1 = {
> > +        .called = false,
> > +    };
> > +    struct fixture_arg arg2 = {
> > +        .called = false,
> > +    };
> > +
> > +    timeval_stop();
> > +    long long int now = time_msec();
> > +
> > +    /* first register a couple of callbacks. */
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
> > +
> > +    ovs_assert(hmap_count(&cm_callbacks) == 2);
> > +
> > +    /* call to yield should not execute callbacks until time threshold. */
> > +    cooperative_multitasking_yield();
> > +    ovs_assert(arg1.called == false);
> > +    ovs_assert(arg2.called == false);
> > +
> > +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
> > +        ovs_assert(cm_entry->last_run == now);
> > +    }
> > +
> > +    /* move clock forward and confirm the expected callbacks to be executed. */
> > +    timeval_warp(0, 1000);
> > +    timeval_stop();
> > +    cooperative_multitasking_yield();
> > +    ovs_assert(arg1.called == true);
> > +    ovs_assert(arg2.called == false);
> > +
> > +    /* move clock forward and confirm the expected callbacks to be executed. */
> > +    arg1.called = arg2.called = false;
> > +    timeval_warp(0, 1000);
> > +    timeval_stop();
> > +    cooperative_multitasking_yield();
> > +    ovs_assert(arg1.called == true);
> > +    ovs_assert(arg2.called == true);
> > +
> > +    timeval_warp(0, 1);
> > +    cooperative_multitasking_destroy();
> > +}
> > +
> > +static void
> > +fixture_buggy_run(struct fixture_arg *arg)
> > +{
> > +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_buggy_run, arg, time_msec(), 0);
> > +    if (arg) {
> > +        arg->called = true;
> > +    }
> > +    /* A real run function MUST NOT directly or indirectly call yield, this is
> > +     * here to test the detection of such a programming error. */
> > +    cooperative_multitasking_yield();
> > +}
> > +
> > +static void
> > +test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
> > +{
> > +    struct fixture_arg arg1 = {
> > +        .called = false,
> > +    };
> > +
> > +    set_program_name(argv[0]);
> > +    vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
> > +    vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
> > +
> > +    time_msec(); /* ensure timeval is initialized */
>
> Capital letter and a period at the end.

Ack.

> > +    timeval_timewarp_enable();
> > +
> > +    cooperative_multitasking_init(&cm_callbacks);
> > +
> > +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_buggy_run, &arg1, 1000, NULL);
> > +    timeval_warp(0, 1000);
> > +    cooperative_multitasking_yield();
> > +    cooperative_multitasking_destroy();
> > +}
> > +
> > +static void
> > +test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
> > +{
> > +    time_msec(); /* ensure timeval is initialized */
>
> Ditto.

Ack.

> > +    timeval_timewarp_enable();
> > +
> > +    cooperative_multitasking_init(&cm_callbacks);
> > +
> > +    test_cm_register();
> > +    test_cm_update();
> > +    test_cm_yield();
> > +}
> > +
> > +OVSTEST_REGISTER("test-cooperative-multitasking",
> > +                 test_cooperative_multitasking);
> > +OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
> > +                 test_cooperative_multitasking_nested_yield);
>
Ilya Maximets Jan. 12, 2024, 2:11 p.m. UTC | #3
On 1/12/24 12:16, Frode Nordahl wrote:
> On Fri, Jan 12, 2024 at 12:26 AM Ilya Maximets <i.maximets@ovn.org> wrote:
>>
>> On 1/10/24 20:29, Frode Nordahl wrote:
>>> One of the goals of Open vSwitch is to be as resource efficient as
>>> possible.  Core parts of the program has been implemented as
>>> asynchronous state machines, and when absolutely necessary
>>> additional threads are used.
>>>
>>> Introduce cooperative multitasking module which allow us to
>>> interleave important processing with long running tasks while
>>> avoiding the additional resource consumption of threads and
>>> complexity of asynchronous state machines.
>>>
>>> We will use this module to ensure long running processing in the
>>> OVSDB server does not interfere with stable maintenance of the
>>> RAFT cluster in subsequent patches.
>>>
>>> Suggested-by: Ilya Maximets <i.maximets@ovn.org>
>>> Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
>>> ---
>>>  lib/automake.mk                        |   3 +
>>>  lib/cooperative-multitasking-private.h |  31 +++
>>>  lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
>>>  lib/cooperative-multitasking.h         |  42 ++++
>>>  tests/automake.mk                      |   1 +
>>>  tests/library.at                       |  10 +
>>>  tests/ovsdb-server.at                  |   1 +
>>>  tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
>>>  8 files changed, 542 insertions(+)
>>>  create mode 100644 lib/cooperative-multitasking-private.h
>>>  create mode 100644 lib/cooperative-multitasking.c
>>>  create mode 100644 lib/cooperative-multitasking.h
>>>  create mode 100644 tests/test-cooperative-multitasking.c
>>
>> Thanks for working on this!  See some comments inline.
> 
> Thank you for providing the counter proposal for this approach, I
> think it will turn out much better than the original approach.
> 
>>>
>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>> index 0dc8a35cc..8596171c6 100644
>>> --- a/lib/automake.mk
>>> +++ b/lib/automake.mk
>>> @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
>>>       lib/conntrack-other.c \
>>>       lib/conntrack.c \
>>>       lib/conntrack.h \
>>> +     lib/cooperative-multitasking.c \
>>> +     lib/cooperative-multitasking.h \
>>> +     lib/cooperative-multitasking-private.h \
>>>       lib/coverage.c \
>>>       lib/coverage.h \
>>>       lib/cpu.c \
>>> diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
>>> new file mode 100644
>>> index 000000000..b2e4e7291
>>> --- /dev/null
>>> +++ b/lib/cooperative-multitasking-private.h
>>> @@ -0,0 +1,31 @@
>>> +/*
>>> + * Copyright (c) 2024 Canonical Ltd.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
>>> +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
>>> +
>>> +#include "openvswitch/hmap.h"
>>> +
>>> +struct cooperative_multitasking_callback {
>>> +    struct hmap_node node;
>>> +    void (*cb)(void *);
>>> +    void *arg;
>>> +    long long int time_threshold;
>>> +    long long int last_run;
>>> +    const char *msg;
>>> +};
>>> +
>>> +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
>>> diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
>>> new file mode 100644
>>> index 000000000..1b1205e8f
>>> --- /dev/null
>>> +++ b/lib/cooperative-multitasking.c
>>> @@ -0,0 +1,195 @@
>>> +/*
>>> + * Copyright (c) 2023 Canonical Ltd.
>>
>> 2024 ? :)
>>
>> Same for other files.
> 
> I started the development before the break, so the files were actually
> created in 2023, and we are now indeed in 2024 so it makes sense to
> update :)
> 
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +
>>> +#include "backtrace.h"
>>> +#include "cooperative-multitasking-private.h"
>>> +#include "cooperative-multitasking.h"
>>> +#include "hash.h"
>>> +#include "openvswitch/hmap.h"
>>> +#include "openvswitch/vlog.h"
>>> +#include "timeval.h"
>>> +
>>> +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
>>> +
>>> +static struct hmap *cooperative_multitasking_callbacks = NULL;
>>
>> I wonder if we can use a shorter name.  You're using 'cm_callbacks'
>> in the test, for example.  It is an internal hash map after all.
>> Probably, no need to have an extra fancy name.
> 
> I was mostly concerned with consistent namespacing since it is a
> "global" variable, as-is it is contained to the module as you point
> out.
> 
> However, if we're going to share it with the unit test through extern
> as proposed below I suggest we keep the long name to avoid potential
> symbol clashes?

I'm not concerned much about symbol clashes, since the header must
not be included anywhere, except for the module itself and the test
code.  The test already knows too much about internals of the module,
so we can expect it to be aware of the internal naming.  But it is
up to you.

> 
>>> +
>>> +/* One time initialization for process that wants to make use of cooperative
>>> + * multitasking module.  References to data is stored in 'hmap_container' and
>>> + * will be referenced by all calls to this module.  The ownership of the
>>> + * container itself remains with the caller while the data in the hmap is owned
>>> + * by this module and must be freed with a call to
>>> + * cooperative_multitasking_destroy().
>>> + *
>>> + * The purpose of having the caller own 'hmap_container' is:
>>> + * 1) Allow runtime decision whether to use cooperative multitasking without
>>> + *    having to pass data between loosely connected parts of a program.  This
>>> + *    is useful for the raft code which is consumed by both the ovsdb-server
>>> + *    daemon and the ovsdb-tool CLI utility.
>>> + * 2) Allow inspection of internal data by unit tests. */
>>
>> Can we instead define the hash map here and export it as extern in the
>> internal header?  It looks a little strange to make a module to hold a
>> static hash map.  At the same time no two modules can call init.
>> It can also be statically initialized with HMAP_INITIALIZER(), so no
>> init function will be necessary.
> 
> I was on that path initially, but at the time I did not have the
> private header so there was no comfortable way to share it, and I
> somehow convinced myself that we needed to conditionally enable this
> for the dual use of raft code from ovsdb-server / ovsdb-tool, but
> there is no such need.
> 
> Will drop _init() and initialize the container in the library instead
> as suggested.
> 
>>> +void
>>> +cooperative_multitasking_init(struct hmap *hmap_container)
>>> +{
>>> +    cooperative_multitasking_callbacks = hmap_container;
>>> +    hmap_init(cooperative_multitasking_callbacks);
>>> +}
>>> +
>>> +/* Register callback 'cb' with argument 'arg' to be called when
>>> + * cooperating long running functions yield and 'time_threshold' msec has
>>> + * passed since the last call to the function.  If the optional 'msg' is not
>>> + * NULL it will be used when logging time threshold overrun conditions.
>>> + *
>>> + * It is possible to register the same callback multiple times as long as 'arg'
>>> + * is different for each registration.  It is up to the caller to ensure no
>>> + * unwanted duplicates are registered.
>>> + *
>>> + * The callback is expected to update the timestamp for last run with a call to
>>> + * cooperative_multitasking_update() using the same values for 'cb' and 'arg'.
>>> + */
>>> +void
>>> +cooperative_multitasking_register(void (*cb)(void *), void *arg,
>>> +                                  long long int time_threshold,
>>> +                                  const char *msg)
>>> +{
>>> +    if (!cooperative_multitasking_callbacks) {
>>> +        return;
>>> +    }
>>> +
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>
>> The structure is named as 'callback', but you're always using
>> 'entry' as a variable name.  Maybe rename the structure?
> 
> Ack, I guess we could even shorten it to 'cm_entry' as it's only used
> internally.

Agreed.

> 
>>> +
>>> +    cm_entry = xzalloc(sizeof *cm_entry);
>>> +    cm_entry->cb = cb;
>>> +    cm_entry->arg = arg;
>>> +    cm_entry->time_threshold = time_threshold;
>>> +    cm_entry->last_run = time_msec();
>>> +    cm_entry->msg = msg;
>>> +
>>> +    hmap_insert(cooperative_multitasking_callbacks,
>>> +                &cm_entry->node,
>>> +                hash_pointer(
>>> +                    cm_entry->arg ? cm_entry->arg : (void *) cm_entry->cb, 0));
>>
>> Why hashing the arg instead of callback?  I suppose that makes
>> sense in context that we'll have only one user with a single
>> function and different arguments.  But it seems backwards from
>> the point of view of a library.
>>
>> In practice, ovsdb-server usually serves only one clustered
>> database, so there should be no difference either way.
>> If it's 2 databases, should also not be a problem.  But hashing
>> the callback seems more logical.
>>
>> What do you think?
> 
> Ack, I guess you caught me in premature/unneeded optimization, it's
> not like there will be thousands of entries for the same value of 'cb'
> with different 'arg'.

I certainly hope so. :)  We would need to hash both in this case,
I guess.

> 
>>
>>> +}
>>> +
>>> +/* Free any data allocated by calls to cooperative_multitasking_register(). */
>>> +void
>>> +cooperative_multitasking_destroy(void)
>>> +{
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>> +    HMAP_FOR_EACH_SAFE (cm_entry, node, cooperative_multitasking_callbacks) {
>>> +        hmap_remove(cooperative_multitasking_callbacks, &cm_entry->node);
>>> +        free(cm_entry);
>>> +    }
>>> +}
>>> +
>>> +/* Update data for already registered callback identified by 'cb' and 'arg'.
>>> + *
>>> + * The value for 'last_run' must at a minimal be updated each time the callback
>>> + * is run.  It can also be useful to update for multiple entry points to the
>>> + * part serviced by the callback to avoid unnecessary callbacks on next call to
>>> + * cooperative_multitasking_yield().
>>
>> I'm not sure I understand this last sentence.
> 
> It's indeed a bit convoluted, so I'll reword it.

Thanks!

> 
>>> + *
>>> + * Updating the value for 'time_threshold' may be necessary as a consequence of
>>> + * the change in runtime configuration or requirements of the serviced
>>> + * callback.
>>> + *
>>> + * Providing a value of 0 for 'last_run' or 'time_threshold' will result in
>>> + * the respective stored value left untouched. */
>>> +void
>>> +cooperative_multitasking_update(void (*cb)(void *), void *arg,
>>> +                                long long int last_run,
>>> +                                long long int time_threshold)
>>> +{
>>> +    if (!cooperative_multitasking_callbacks) {
>>> +        return;
>>> +    }
>>> +
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>> +
>>> +    HMAP_FOR_EACH_WITH_HASH (cm_entry, node,
>>> +                             hash_pointer(arg ? arg : (void *) cb, 0),
>>> +                             cooperative_multitasking_callbacks)
>>> +    {
>>> +        if (cm_entry->cb == cb && cm_entry->arg == arg) {
>>> +            if (last_run) {
>>> +                cm_entry->last_run = last_run;
>>> +            }
>>> +
>>> +            if (time_threshold) {
>>> +                cm_entry->time_threshold = time_threshold;
>>> +            }
>>> +            return;
>>
>> The register() function doesn't check for duplicates, so there
>> can be duplicates if used inaccurately and they will not be
>> updated.
>>
>> Should we just have a single function like _set() that would
>> find and update or create if not found?  Instead of relying on
>> users to not add the same thing twice.
> 
> Yes, that makes sense. The fact that no duplicate check is done was
> documented, but condensing this to a single _set() will most likely
> make that behavior even more clear to the caller.
> 
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +cooperative_multitasking_yield_at__(const char *source_location)
>>> +{
>>> +    long long int now = time_msec();
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>
>> Reverse x-mass tree.
> 
> Ack.
> 
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, cooperative_multitasking_callbacks) {
>>> +        long long int elapsed = now - cm_entry->last_run;
>>> +
>>> +        if (elapsed >= cm_entry->time_threshold) {
>>> +            VLOG_DBG("yield called from %s: "
>>> +                     "%lld: %lld >= %lld, executing %p(%p)",
>>> +                     source_location, now, elapsed, cm_entry->time_threshold,
>>> +                     cm_entry->cb, cm_entry->arg);
>>
>> Maybe an empty line here.
>> Also, the pointers are not very informative.  Should something
>> like 'name' be added to the structure?  So, it will be %s(%p).
>>
>> Not sure why logging 'now'.  Maybe re-organize things a little:
>>
>> "%{source}: yield for %{name}: elapsed(%lld) >= threshold(%lld), overrun: %lld"
>>
>> What do you think?
>>
>>> +            (*cm_entry->cb)(cm_entry->arg);
>>> +            if (elapsed - cm_entry->time_threshold >
>>> +                cm_entry->time_threshold / 8)
>>
>> Callback updates thresholds, so the check should be done before
>> calling it.   Maybe 'time_' part of the structure filed is
>> redundant.  Without it the lines will be shorter.
> 
> Thank you for pointing out that the callback can update the threshold,
> that fact eluded me in some iteration.
> 
> My original thought for splitting this across two calls was to avoid
> delaying the callback even further by spending time on producing the
> backtrace. But I guess it's not that expensive and it is only
> performed during debugging.  Doing it before also avoids polluting the
> backtrace with the call to the callback.
> 
>>> +            {
>>
>> Should be on a previous line.
>>
>>> +                VLOG_WARN("yield threshold overrun with %lld msec. %s",
>>> +                          elapsed - cm_entry->time_threshold,
>>> +                          cm_entry->msg ? cm_entry->msg : "");
>>
>> If we add a name, we should print the callback name here as well.
>> And we may even drop the 'msg', because it will be more or less
>> clear what went wrong if we know that it is a raft_run, for example.
>>
>> Also, we could combine two log calls into one by using base VLOG()
>> macro and choosing the log level conditionally, e.g.:
>>
>>     bool warn = elapsed - entry->threshold > entry->threshold / 8;
>>
>>     VLOG(warn ? VLL_WARN : VLL_DBG, ... );
>>     if (warn && VLOG_IS_DBG_ENABLED()) {
>>         backtrace
>>     }
>>     entry->cb();
> 
> Yes, that is indeed much nicer, thx!
> 
>>> +                if (VLOG_IS_DBG_ENABLED()) {
>>> +                    /* log_backtrace() logs at ERROR level but we only want to
>>> +                     * log a backtrace when DEBUG is enabled */
>>> +                    log_backtrace();
>>> +                }
>>> +            }
>>
>> Should we also update 'now'?  The callback itself might have taken
>> some time.
> 
> Yes, good point.
> 
>>> +        }
>>> +    }
>>
>> Maybe log a warning if overall yield run took more than, let's say, a second?
> 
> Ack.
> 
>>> +}
>>> +
>>> +/* Iterate over registered callbacks and execute callbacks as demanded by the
>>> + * recorded time threshold. */
>>> +void
>>> +cooperative_multitasking_yield_at(const char *source_location)
>>> +{
>>> +    static bool yield_in_progress = false;
>>> +
>>> +    if (!cooperative_multitasking_callbacks) {
>>> +        return;
>>> +    }
>>> +
>>> +    if (yield_in_progress) {
>>> +        VLOG_ERR_ONCE("nested yield avoided, this is a bug! "
>>> +                      "enable debug logging for more details.");
>>
>> Might be better to capitalize the first letters in sentences.
>> The second one is bothering me :) , but if we capitalize the
>> second, then we also need to do the first.
> 
> Ack.
> 
>>> +        if (VLOG_IS_DBG_ENABLED()) {
>>> +            VLOG_DBG("nested yield, called from %s", source_location);
>>> +            log_backtrace();
>>> +        }
>>> +        return;
>>> +    }
>>> +    yield_in_progress = true;
>>> +
>>> +    cooperative_multitasking_yield_at__(source_location);
>>> +
>>> +    yield_in_progress = false;
>>> +}
>>> diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h
>>> new file mode 100644
>>> index 000000000..6286bfbf5
>>> --- /dev/null
>>> +++ b/lib/cooperative-multitasking.h
>>> @@ -0,0 +1,42 @@
>>> +/*
>>> + * Copyright (c) 2023 Canonical Ltd.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#ifndef COOPERATIVE_MULTITASKING_H
>>> +#define COOPERATIVE_MULTITASKING_H 1
>>> +
>>
>> I think, we need a comment here describing the library and some use cases.
>> At least a little bit.  It should also have a thread safety section
>> clearly stating that the library is not thread safe can only be used from
>> one thread.
> 
> Ack.
> 
>>> +struct hmap;
>>> +
>>> +void cooperative_multitasking_init(struct hmap *);
>>> +
>>> +void cooperative_multitasking_register(void (*)(void *), void *,
>>> +                                       long long int, const char *);
>>> +#define COOPERATIVE_MULTITASKING_REGISTER(CB, ARG, TIME_THRESHOLD, MSG)       \
>>> +    cooperative_multitasking_register((void (*)(void *)) CB, (void *) ARG,    \
>>> +                                      TIME_THRESHOLD, MSG)
>>> +
>>> +void cooperative_multitasking_destroy(void);
>>
>> We probably need an unregister/remove-style function as well.
>> For example, if we remove clustered database in runtime, we need
>> to make sure the callback will not be called for a destroyed
>> raft structure.
> 
> Yes, that is a good point. I'll add that.
> 
>>> +
>>> +void cooperative_multitasking_update(void (*)(void *), void *, long long int,
>>> +                                     long long int);
>>> +#define COOPERATIVE_MULTITASKING_UPDATE(CB, ARG, LAST_RUN, TIME_THRESHOLD)    \
>>> +    cooperative_multitasking_update((void (*) (void *)) CB, (void *) ARG,     \
>>> +                                    LAST_RUN, TIME_THRESHOLD)
>>
>> I don't think we should have these macros.  I see that their purpose
>> is to hide pointer conversions, but unfortunately we can't really do that.
>> We need to make sure that all the callbacks actually have void (*) (void *))
>> prototypes and the arguments of these callbacks are actually void pointers.
>>
>> The problem is that llvm devs decided that it is an undefined behavior
>> to call functions via pointer with a different type:
>>   http://reviews.llvm.org/D148827#4379764
>> And now UBSAN in clang-17 is complaining all over the place for multiple
>> projects:
>>   https://github.com/systemd/systemd/issues/29972
>>   https://github.com/openssl/openssl/issues/22896
>>
>> This also affects OVS in rcu callbacks, but we didn't get around fixing
>> this yet, as release right now is much more important.
>> However, it'll be better to not introduce more issues like this.
>>
>> Unfortunately we'll likely have to have a wrapper function like:
>>
>>   void raft_run_wrap(void *arg)
>>   {
>>       struct raft *raft = (struct raft *) arg;
>>
>>       raft_run(raft);
>>   }
>>
>> And use that as a callback...  It is ugly and looks very unnecessary,
>> but llvm folks are not backing down from this and it looks like the only
>> option for generic libraries like RCU or coop-multitasking.
> 
> Thank you for elaborating on this, I was unaware of this looming
> compiler change on the horizon.
> 
>>> +
>>> +void cooperative_multitasking_yield_at(const char *);
>>> +#define cooperative_multitasking_yield() \
>>> +    cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
>>> +
>>> +#endif /* COOPERATIVE_MULTITASKING_H */
>>> diff --git a/tests/automake.mk b/tests/automake.mk
>>> index 10c9fbb01..08c9b74d4 100644
>>> --- a/tests/automake.mk
>>> +++ b/tests/automake.mk
>>> @@ -456,6 +456,7 @@ tests_ovstest_SOURCES = \
>>>       tests/test-ccmap.c \
>>>       tests/test-cmap.c \
>>>       tests/test-conntrack.c \
>>> +     tests/test-cooperative-multitasking.c \
>>>       tests/test-csum.c \
>>>       tests/test-flows.c \
>>>       tests/test-hash.c \
>>> diff --git a/tests/library.at b/tests/library.at
>>> index 3f9df2f87..77d5abb01 100644
>>> --- a/tests/library.at
>>> +++ b/tests/library.at
>>> @@ -296,3 +296,13 @@ AT_CLEANUP
>>>  AT_SETUP([uuidset module])
>>>  AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
>>>  AT_CLEANUP
>>> +
>>> +AT_SETUP([cooperative-multitasking module])
>>> +AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
>>> +AT_CLEANUP
>>> +
>>> +AT_SETUP([cooperative-multitasking module nested yield detection])
>>> +AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
>>> +cooperative_multitasking|ERR|nested yield avoided, this is a bug! enable debug logging for more details.
>>> +])
>>> +AT_CLEANUP
>>> diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
>>> index 6eb758e22..88a9a9a27 100644
>>> --- a/tests/ovsdb-server.at
>>> +++ b/tests/ovsdb-server.at
>>> @@ -2387,6 +2387,7 @@ m4_define([CLEAN_LOG_FILE],
>>>    [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
>>>     sed '/|poll_loop|/d' |   dnl
>>>     sed '/|socket_util|/d' | dnl
>>> +   sed '/|cooperative_multitasking|DBG|/d' | dnl
>>>     sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
>>>
>>>  CLEAN_LOG_FILE([1.log], [1.log.clear])
>>> diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c
>>> new file mode 100644
>>> index 000000000..ec6be865f
>>> --- /dev/null
>>> +++ b/tests/test-cooperative-multitasking.c
>>> @@ -0,0 +1,259 @@
>>> +/*
>>> + * Copyright (c) 2023 Canonical Ltd.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +#undef NDEBUG
>>> +#include "cooperative-multitasking.h"
>>> +#include "cooperative-multitasking-private.h"
>>> +#include "openvswitch/hmap.h"
>>> +#include "ovstest.h"
>>> +#include "timeval.h"
>>> +#include "util.h"
>>> +#include "openvswitch/vlog.h"
>>> +
>>> +static struct hmap cm_callbacks;
>>> +
>>> +struct fixture_arg {
>>> +    bool called;
>>> +};
>>> +
>>> +static void
>>> +fixture_run(struct fixture_arg *arg)
>>
>> These will have to have a void * arguments...
> 
> I'll add the wrap functions to the test as well.
> 
>>> +{
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, arg, time_msec(), 0);
>>> +    if (arg) {
>>> +        arg->called = true;
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +fixture_other_run(struct fixture_arg *arg)
>>> +{
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_other_run, arg, time_msec(), 0);
>>> +    if (arg) {
>>> +        arg->called = true;
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +test_cm_register(void)
>>> +{
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>> +    struct fixture_arg arg1 = {
>>> +        .called = false,
>>> +    };
>>> +    struct fixture_arg arg2 = {
>>> +        .called = false,
>>> +    };
>>> +
>>> +    timeval_stop();
>>> +    long long int now = time_msec();
>>> +
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_other_run, NULL, 3000, NULL);
>>> +
>>> +    ovs_assert(hmap_count(&cm_callbacks) == 3);
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
>>> +        if (cm_entry->arg == (void *)&arg1) {
>>> +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
>>> +            ovs_assert (cm_entry->time_threshold == 1000);
>>> +            ovs_assert (cm_entry->last_run == now);
>>> +        } else if (cm_entry->arg == (void *)&arg2) {
>>> +            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
>>> +            ovs_assert (cm_entry->time_threshold == 2000);
>>> +            ovs_assert (cm_entry->last_run == now);
>>> +        } else if (cm_entry->cb == (void (*)(void *)) &fixture_other_run) {
>>> +            ovs_assert (cm_entry->arg == NULL);
>>> +            ovs_assert (cm_entry->time_threshold == 3000);
>>> +            ovs_assert (cm_entry->last_run == now);
>>
>> Nit: no space after assert, but a space between the type and a
>> variable in a cast.  Same for other similar parts of the test.
> 
> Ack.
> 
>>> +        } else {
>>> +            OVS_NOT_REACHED();
>>> +        }
>>> +    }
>>> +
>>> +    cooperative_multitasking_destroy();
>>> +}
>>> +
>>> +static void
>>> +test_cm_update(void)
>>> +{
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>> +    struct fixture_arg arg1 = {
>>> +        .called = false,
>>> +    };
>>> +    struct fixture_arg arg2 = {
>>> +        .called = false,
>>> +    };
>>> +
>>> +    timeval_stop();
>>> +    long long int now = time_msec();
>>> +
>>> +    /* first register a couple of callbacks. */
>>
>> Should start with a capital letter.  Same for all other comments in a file.
> 
> Ack.
> 
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 0, NULL);
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 0, NULL);
>>> +
>>> +    ovs_assert(hmap_count(&cm_callbacks) == 2);
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
>>> +        if (cm_entry->arg == (void *)&arg1) {
>>> +            ovs_assert (cm_entry->time_threshold == 0);
>>> +            ovs_assert (cm_entry->last_run == now);
>>> +        } else if (cm_entry->arg == (void *)&arg2) {
>>> +            ovs_assert (cm_entry->time_threshold == 0);
>>> +            ovs_assert (cm_entry->last_run == now);
>>> +        } else {
>>> +            OVS_NOT_REACHED();
>>> +        }
>>> +    }
>>> +
>>> +    /* update 'last_run' and 'time_threshold' for each callback and validate
>>> +     * that the correct entry was actually updated. */
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 1, 2);
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 3, 4);
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
>>> +        if (cm_entry->arg == (void *)&arg1) {
>>> +            ovs_assert (cm_entry->time_threshold == 2);
>>> +            ovs_assert (cm_entry->last_run == 1);
>>> +        } else if (cm_entry->arg == (void *)&arg2) {
>>> +            ovs_assert (cm_entry->time_threshold == 4);
>>> +            ovs_assert (cm_entry->last_run == 3);
>>> +        } else {
>>> +            OVS_NOT_REACHED();
>>> +        }
>>> +    }
>>> +
>>> +    /* confirm that providing 0 for 'last_run' or 'time_threshold' leaves the
>>> +     * existing value untouched. */
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 0, 5);
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 6, 0);
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
>>> +        if (cm_entry->arg == (void *)&arg1) {
>>> +            ovs_assert (cm_entry->time_threshold == 5);
>>> +            ovs_assert (cm_entry->last_run == 1);
>>> +        } else if (cm_entry->arg == (void *)&arg2) {
>>> +            ovs_assert (cm_entry->time_threshold == 4);
>>> +            ovs_assert (cm_entry->last_run == 6);
>>> +        } else {
>>> +            OVS_NOT_REACHED();
>>> +        }
>>> +    }
>>> +
>>> +    cooperative_multitasking_destroy();
>>> +}
>>> +
>>> +static void
>>> +test_cm_yield(void)
>>> +{
>>> +    struct cooperative_multitasking_callback *cm_entry;
>>> +    struct fixture_arg arg1 = {
>>> +        .called = false,
>>> +    };
>>> +    struct fixture_arg arg2 = {
>>> +        .called = false,
>>> +    };
>>> +
>>> +    timeval_stop();
>>> +    long long int now = time_msec();
>>> +
>>> +    /* first register a couple of callbacks. */
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
>>> +
>>> +    ovs_assert(hmap_count(&cm_callbacks) == 2);
>>> +
>>> +    /* call to yield should not execute callbacks until time threshold. */
>>> +    cooperative_multitasking_yield();
>>> +    ovs_assert(arg1.called == false);
>>> +    ovs_assert(arg2.called == false);
>>> +
>>> +    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
>>> +        ovs_assert(cm_entry->last_run == now);
>>> +    }
>>> +
>>> +    /* move clock forward and confirm the expected callbacks to be executed. */
>>> +    timeval_warp(0, 1000);
>>> +    timeval_stop();
>>> +    cooperative_multitasking_yield();
>>> +    ovs_assert(arg1.called == true);
>>> +    ovs_assert(arg2.called == false);
>>> +
>>> +    /* move clock forward and confirm the expected callbacks to be executed. */
>>> +    arg1.called = arg2.called = false;
>>> +    timeval_warp(0, 1000);
>>> +    timeval_stop();
>>> +    cooperative_multitasking_yield();
>>> +    ovs_assert(arg1.called == true);
>>> +    ovs_assert(arg2.called == true);
>>> +
>>> +    timeval_warp(0, 1);
>>> +    cooperative_multitasking_destroy();
>>> +}
>>> +
>>> +static void
>>> +fixture_buggy_run(struct fixture_arg *arg)
>>> +{
>>> +    COOPERATIVE_MULTITASKING_UPDATE(&fixture_buggy_run, arg, time_msec(), 0);
>>> +    if (arg) {
>>> +        arg->called = true;
>>> +    }
>>> +    /* A real run function MUST NOT directly or indirectly call yield, this is
>>> +     * here to test the detection of such a programming error. */
>>> +    cooperative_multitasking_yield();
>>> +}
>>> +
>>> +static void
>>> +test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
>>> +{
>>> +    struct fixture_arg arg1 = {
>>> +        .called = false,
>>> +    };
>>> +
>>> +    set_program_name(argv[0]);
>>> +    vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
>>> +    vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
>>> +
>>> +    time_msec(); /* ensure timeval is initialized */
>>
>> Capital letter and a period at the end.
> 
> Ack.
> 
>>> +    timeval_timewarp_enable();
>>> +
>>> +    cooperative_multitasking_init(&cm_callbacks);
>>> +
>>> +    COOPERATIVE_MULTITASKING_REGISTER(&fixture_buggy_run, &arg1, 1000, NULL);
>>> +    timeval_warp(0, 1000);
>>> +    cooperative_multitasking_yield();
>>> +    cooperative_multitasking_destroy();
>>> +}
>>> +
>>> +static void
>>> +test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
>>> +{
>>> +    time_msec(); /* ensure timeval is initialized */
>>
>> Ditto.
> 
> Ack.
> 
>>> +    timeval_timewarp_enable();
>>> +
>>> +    cooperative_multitasking_init(&cm_callbacks);
>>> +
>>> +    test_cm_register();
>>> +    test_cm_update();
>>> +    test_cm_yield();
>>> +}
>>> +
>>> +OVSTEST_REGISTER("test-cooperative-multitasking",
>>> +                 test_cooperative_multitasking);
>>> +OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
>>> +                 test_cooperative_multitasking_nested_yield);
>>
> 
>
Frode Nordahl Jan. 12, 2024, 3:28 p.m. UTC | #4
On Fri, Jan 12, 2024 at 3:11 PM Ilya Maximets <i.maximets@ovn.org> wrote:
>
> On 1/12/24 12:16, Frode Nordahl wrote:
> > On Fri, Jan 12, 2024 at 12:26 AM Ilya Maximets <i.maximets@ovn.org> wrote:
> >>
> >> On 1/10/24 20:29, Frode Nordahl wrote:
> >>> One of the goals of Open vSwitch is to be as resource efficient as
> >>> possible.  Core parts of the program has been implemented as
> >>> asynchronous state machines, and when absolutely necessary
> >>> additional threads are used.
> >>>
> >>> Introduce cooperative multitasking module which allow us to
> >>> interleave important processing with long running tasks while
> >>> avoiding the additional resource consumption of threads and
> >>> complexity of asynchronous state machines.
> >>>
> >>> We will use this module to ensure long running processing in the
> >>> OVSDB server does not interfere with stable maintenance of the
> >>> RAFT cluster in subsequent patches.
> >>>
> >>> Suggested-by: Ilya Maximets <i.maximets@ovn.org>
> >>> Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
> >>> ---
> >>>  lib/automake.mk                        |   3 +
> >>>  lib/cooperative-multitasking-private.h |  31 +++
> >>>  lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
> >>>  lib/cooperative-multitasking.h         |  42 ++++
> >>>  tests/automake.mk                      |   1 +
> >>>  tests/library.at                       |  10 +
> >>>  tests/ovsdb-server.at                  |   1 +
> >>>  tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
> >>>  8 files changed, 542 insertions(+)
> >>>  create mode 100644 lib/cooperative-multitasking-private.h
> >>>  create mode 100644 lib/cooperative-multitasking.c
> >>>  create mode 100644 lib/cooperative-multitasking.h
> >>>  create mode 100644 tests/test-cooperative-multitasking.c
> >>
> >> Thanks for working on this!  See some comments inline.
> >
> > Thank you for providing the counter proposal for this approach, I
> > think it will turn out much better than the original approach.
> >
> >>>
> >>> diff --git a/lib/automake.mk b/lib/automake.mk
> >>> index 0dc8a35cc..8596171c6 100644
> >>> --- a/lib/automake.mk
> >>> +++ b/lib/automake.mk
> >>> @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
> >>>       lib/conntrack-other.c \
> >>>       lib/conntrack.c \
> >>>       lib/conntrack.h \
> >>> +     lib/cooperative-multitasking.c \
> >>> +     lib/cooperative-multitasking.h \
> >>> +     lib/cooperative-multitasking-private.h \
> >>>       lib/coverage.c \
> >>>       lib/coverage.h \
> >>>       lib/cpu.c \
> >>> diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
> >>> new file mode 100644
> >>> index 000000000..b2e4e7291
> >>> --- /dev/null
> >>> +++ b/lib/cooperative-multitasking-private.h
> >>> @@ -0,0 +1,31 @@
> >>> +/*
> >>> + * Copyright (c) 2024 Canonical Ltd.
> >>> + *
> >>> + * Licensed under the Apache License, Version 2.0 (the "License");
> >>> + * you may not use this file except in compliance with the License.
> >>> + * You may obtain a copy of the License at:
> >>> + *
> >>> + *     http://www.apache.org/licenses/LICENSE-2.0
> >>> + *
> >>> + * Unless required by applicable law or agreed to in writing, software
> >>> + * distributed under the License is distributed on an "AS IS" BASIS,
> >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> >>> + * See the License for the specific language governing permissions and
> >>> + * limitations under the License.
> >>> + */
> >>> +
> >>> +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
> >>> +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
> >>> +
> >>> +#include "openvswitch/hmap.h"
> >>> +
> >>> +struct cooperative_multitasking_callback {
> >>> +    struct hmap_node node;
> >>> +    void (*cb)(void *);
> >>> +    void *arg;
> >>> +    long long int time_threshold;
> >>> +    long long int last_run;
> >>> +    const char *msg;
> >>> +};
> >>> +
> >>> +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
> >>> diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
> >>> new file mode 100644
> >>> index 000000000..1b1205e8f
> >>> --- /dev/null
> >>> +++ b/lib/cooperative-multitasking.c
> >>> @@ -0,0 +1,195 @@
> >>> +/*
> >>> + * Copyright (c) 2023 Canonical Ltd.
> >>
> >> 2024 ? :)
> >>
> >> Same for other files.
> >
> > I started the development before the break, so the files were actually
> > created in 2023, and we are now indeed in 2024 so it makes sense to
> > update :)
> >
> >>> + *
> >>> + * Licensed under the Apache License, Version 2.0 (the "License");
> >>> + * you may not use this file except in compliance with the License.
> >>> + * You may obtain a copy of the License at:
> >>> + *
> >>> + *     http://www.apache.org/licenses/LICENSE-2.0
> >>> + *
> >>> + * Unless required by applicable law or agreed to in writing, software
> >>> + * distributed under the License is distributed on an "AS IS" BASIS,
> >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> >>> + * See the License for the specific language governing permissions and
> >>> + * limitations under the License.
> >>> + */
> >>> +
> >>> +#include <config.h>
> >>> +
> >>> +#include "backtrace.h"
> >>> +#include "cooperative-multitasking-private.h"
> >>> +#include "cooperative-multitasking.h"
> >>> +#include "hash.h"
> >>> +#include "openvswitch/hmap.h"
> >>> +#include "openvswitch/vlog.h"
> >>> +#include "timeval.h"
> >>> +
> >>> +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
> >>> +
> >>> +static struct hmap *cooperative_multitasking_callbacks = NULL;
> >>
> >> I wonder if we can use a shorter name.  You're using 'cm_callbacks'
> >> in the test, for example.  It is an internal hash map after all.
> >> Probably, no need to have an extra fancy name.
> >
> > I was mostly concerned with consistent namespacing since it is a
> > "global" variable, as-is it is contained to the module as you point
> > out.
> >
> > However, if we're going to share it with the unit test through extern
> > as proposed below I suggest we keep the long name to avoid potential
> > symbol clashes?
>
> I'm not concerned much about symbol clashes, since the header must
> not be included anywhere, except for the module itself and the test
> code.  The test already knows too much about internals of the module,
> so we can expect it to be aware of the internal naming.  But it is
> up to you.

Hum, I'm sure I'm doing something different than what you intended
with your comment here then, with:
lib/cooperative-multitasking.c:
struct hmap cooperative_multitasking_callbacks = HMAP_INITIALIZER(
    &cooperative_multitasking_callbacks);

lib/cooperative-multitasking-private.h
extern struct hmap cooperative_multitasking_callbacks;

The linker will see `cooperative_multitasking_callbacks` as a global symbol:
$ nm ovsdb/ovsdb-server|grep callbacks
0000000000245598 D cooperative_multitasking_callbacks

and subsequently it will not be available for use in other modules,
regardless of which header is included.
Ilya Maximets Jan. 12, 2024, 3:50 p.m. UTC | #5
On 1/12/24 16:28, Frode Nordahl wrote:
> On Fri, Jan 12, 2024 at 3:11 PM Ilya Maximets <i.maximets@ovn.org> wrote:
>>
>> On 1/12/24 12:16, Frode Nordahl wrote:
>>> On Fri, Jan 12, 2024 at 12:26 AM Ilya Maximets <i.maximets@ovn.org> wrote:
>>>>
>>>> On 1/10/24 20:29, Frode Nordahl wrote:
>>>>> One of the goals of Open vSwitch is to be as resource efficient as
>>>>> possible.  Core parts of the program has been implemented as
>>>>> asynchronous state machines, and when absolutely necessary
>>>>> additional threads are used.
>>>>>
>>>>> Introduce cooperative multitasking module which allow us to
>>>>> interleave important processing with long running tasks while
>>>>> avoiding the additional resource consumption of threads and
>>>>> complexity of asynchronous state machines.
>>>>>
>>>>> We will use this module to ensure long running processing in the
>>>>> OVSDB server does not interfere with stable maintenance of the
>>>>> RAFT cluster in subsequent patches.
>>>>>
>>>>> Suggested-by: Ilya Maximets <i.maximets@ovn.org>
>>>>> Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
>>>>> ---
>>>>>  lib/automake.mk                        |   3 +
>>>>>  lib/cooperative-multitasking-private.h |  31 +++
>>>>>  lib/cooperative-multitasking.c         | 195 +++++++++++++++++++
>>>>>  lib/cooperative-multitasking.h         |  42 ++++
>>>>>  tests/automake.mk                      |   1 +
>>>>>  tests/library.at                       |  10 +
>>>>>  tests/ovsdb-server.at                  |   1 +
>>>>>  tests/test-cooperative-multitasking.c  | 259 +++++++++++++++++++++++++
>>>>>  8 files changed, 542 insertions(+)
>>>>>  create mode 100644 lib/cooperative-multitasking-private.h
>>>>>  create mode 100644 lib/cooperative-multitasking.c
>>>>>  create mode 100644 lib/cooperative-multitasking.h
>>>>>  create mode 100644 tests/test-cooperative-multitasking.c
>>>>
>>>> Thanks for working on this!  See some comments inline.
>>>
>>> Thank you for providing the counter proposal for this approach, I
>>> think it will turn out much better than the original approach.
>>>
>>>>>
>>>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>>>> index 0dc8a35cc..8596171c6 100644
>>>>> --- a/lib/automake.mk
>>>>> +++ b/lib/automake.mk
>>>>> @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
>>>>>       lib/conntrack-other.c \
>>>>>       lib/conntrack.c \
>>>>>       lib/conntrack.h \
>>>>> +     lib/cooperative-multitasking.c \
>>>>> +     lib/cooperative-multitasking.h \
>>>>> +     lib/cooperative-multitasking-private.h \
>>>>>       lib/coverage.c \
>>>>>       lib/coverage.h \
>>>>>       lib/cpu.c \
>>>>> diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
>>>>> new file mode 100644
>>>>> index 000000000..b2e4e7291
>>>>> --- /dev/null
>>>>> +++ b/lib/cooperative-multitasking-private.h
>>>>> @@ -0,0 +1,31 @@
>>>>> +/*
>>>>> + * Copyright (c) 2024 Canonical Ltd.
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
>>>>> +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
>>>>> +
>>>>> +#include "openvswitch/hmap.h"
>>>>> +
>>>>> +struct cooperative_multitasking_callback {
>>>>> +    struct hmap_node node;
>>>>> +    void (*cb)(void *);
>>>>> +    void *arg;
>>>>> +    long long int time_threshold;
>>>>> +    long long int last_run;
>>>>> +    const char *msg;
>>>>> +};
>>>>> +
>>>>> +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
>>>>> diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
>>>>> new file mode 100644
>>>>> index 000000000..1b1205e8f
>>>>> --- /dev/null
>>>>> +++ b/lib/cooperative-multitasking.c
>>>>> @@ -0,0 +1,195 @@
>>>>> +/*
>>>>> + * Copyright (c) 2023 Canonical Ltd.
>>>>
>>>> 2024 ? :)
>>>>
>>>> Same for other files.
>>>
>>> I started the development before the break, so the files were actually
>>> created in 2023, and we are now indeed in 2024 so it makes sense to
>>> update :)
>>>
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#include <config.h>
>>>>> +
>>>>> +#include "backtrace.h"
>>>>> +#include "cooperative-multitasking-private.h"
>>>>> +#include "cooperative-multitasking.h"
>>>>> +#include "hash.h"
>>>>> +#include "openvswitch/hmap.h"
>>>>> +#include "openvswitch/vlog.h"
>>>>> +#include "timeval.h"
>>>>> +
>>>>> +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
>>>>> +
>>>>> +static struct hmap *cooperative_multitasking_callbacks = NULL;
>>>>
>>>> I wonder if we can use a shorter name.  You're using 'cm_callbacks'
>>>> in the test, for example.  It is an internal hash map after all.
>>>> Probably, no need to have an extra fancy name.
>>>
>>> I was mostly concerned with consistent namespacing since it is a
>>> "global" variable, as-is it is contained to the module as you point
>>> out.
>>>
>>> However, if we're going to share it with the unit test through extern
>>> as proposed below I suggest we keep the long name to avoid potential
>>> symbol clashes?
>>
>> I'm not concerned much about symbol clashes, since the header must
>> not be included anywhere, except for the module itself and the test
>> code.  The test already knows too much about internals of the module,
>> so we can expect it to be aware of the internal naming.  But it is
>> up to you.
> 
> Hum, I'm sure I'm doing something different than what you intended
> with your comment here then, with:
> lib/cooperative-multitasking.c:
> struct hmap cooperative_multitasking_callbacks = HMAP_INITIALIZER(
>     &cooperative_multitasking_callbacks);
> 
> lib/cooperative-multitasking-private.h
> extern struct hmap cooperative_multitasking_callbacks;
> 
> The linker will see `cooperative_multitasking_callbacks` as a global symbol:
> $ nm ovsdb/ovsdb-server|grep callbacks
> 0000000000245598 D cooperative_multitasking_callbacks
> 
> and subsequently it will not be available for use in other modules,
> regardless of which header is included.
> 

Ah, I see what you mean.  I missed that it will become part of a
global namespace since it's not static anymore.  OK then.
Still up to you.  We may keep it as-is.  But if there are some
suggestions on how to shorten it, that would be nice.
We could potentially shorten it later, when someone has a sudden
epiphany on better naming. :D

Best regards, Ilya Maximets.
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 0dc8a35cc..8596171c6 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -94,6 +94,9 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/conntrack-other.c \
 	lib/conntrack.c \
 	lib/conntrack.h \
+	lib/cooperative-multitasking.c \
+	lib/cooperative-multitasking.h \
+	lib/cooperative-multitasking-private.h \
 	lib/coverage.c \
 	lib/coverage.h \
 	lib/cpu.c \
diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
new file mode 100644
index 000000000..b2e4e7291
--- /dev/null
+++ b/lib/cooperative-multitasking-private.h
@@ -0,0 +1,31 @@ 
+/*
+ * Copyright (c) 2024 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
+#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
+
+#include "openvswitch/hmap.h"
+
+struct cooperative_multitasking_callback {
+    struct hmap_node node;
+    void (*cb)(void *);
+    void *arg;
+    long long int time_threshold;
+    long long int last_run;
+    const char *msg;
+};
+
+#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
new file mode 100644
index 000000000..1b1205e8f
--- /dev/null
+++ b/lib/cooperative-multitasking.c
@@ -0,0 +1,195 @@ 
+/*
+ * Copyright (c) 2023 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "backtrace.h"
+#include "cooperative-multitasking-private.h"
+#include "cooperative-multitasking.h"
+#include "hash.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/vlog.h"
+#include "timeval.h"
+
+VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
+
+static struct hmap *cooperative_multitasking_callbacks = NULL;
+
+/* One time initialization for process that wants to make use of cooperative
+ * multitasking module.  References to data is stored in 'hmap_container' and
+ * will be referenced by all calls to this module.  The ownership of the
+ * container itself remains with the caller while the data in the hmap is owned
+ * by this module and must be freed with a call to
+ * cooperative_multitasking_destroy().
+ *
+ * The purpose of having the caller own 'hmap_container' is:
+ * 1) Allow runtime decision whether to use cooperative multitasking without
+ *    having to pass data between loosely connected parts of a program.  This
+ *    is useful for the raft code which is consumed by both the ovsdb-server
+ *    daemon and the ovsdb-tool CLI utility.
+ * 2) Allow inspection of internal data by unit tests. */
+void
+cooperative_multitasking_init(struct hmap *hmap_container)
+{
+    cooperative_multitasking_callbacks = hmap_container;
+    hmap_init(cooperative_multitasking_callbacks);
+}
+
+/* Register callback 'cb' with argument 'arg' to be called when
+ * cooperating long running functions yield and 'time_threshold' msec has
+ * passed since the last call to the function.  If the optional 'msg' is not
+ * NULL it will be used when logging time threshold overrun conditions.
+ *
+ * It is possible to register the same callback multiple times as long as 'arg'
+ * is different for each registration.  It is up to the caller to ensure no
+ * unwanted duplicates are registered.
+ *
+ * The callback is expected to update the timestamp for last run with a call to
+ * cooperative_multitasking_update() using the same values for 'cb' and 'arg'.
+ */
+void
+cooperative_multitasking_register(void (*cb)(void *), void *arg,
+                                  long long int time_threshold,
+                                  const char *msg)
+{
+    if (!cooperative_multitasking_callbacks) {
+        return;
+    }
+
+    struct cooperative_multitasking_callback *cm_entry;
+
+    cm_entry = xzalloc(sizeof *cm_entry);
+    cm_entry->cb = cb;
+    cm_entry->arg = arg;
+    cm_entry->time_threshold = time_threshold;
+    cm_entry->last_run = time_msec();
+    cm_entry->msg = msg;
+
+    hmap_insert(cooperative_multitasking_callbacks,
+                &cm_entry->node,
+                hash_pointer(
+                    cm_entry->arg ? cm_entry->arg : (void *) cm_entry->cb, 0));
+}
+
+/* Free any data allocated by calls to cooperative_multitasking_register(). */
+void
+cooperative_multitasking_destroy(void)
+{
+    struct cooperative_multitasking_callback *cm_entry;
+    HMAP_FOR_EACH_SAFE (cm_entry, node, cooperative_multitasking_callbacks) {
+        hmap_remove(cooperative_multitasking_callbacks, &cm_entry->node);
+        free(cm_entry);
+    }
+}
+
+/* Update data for already registered callback identified by 'cb' and 'arg'.
+ *
+ * The value for 'last_run' must at a minimal be updated each time the callback
+ * is run.  It can also be useful to update for multiple entry points to the
+ * part serviced by the callback to avoid unnecessary callbacks on next call to
+ * cooperative_multitasking_yield().
+ *
+ * Updating the value for 'time_threshold' may be necessary as a consequence of
+ * the change in runtime configuration or requirements of the serviced
+ * callback.
+ *
+ * Providing a value of 0 for 'last_run' or 'time_threshold' will result in
+ * the respective stored value left untouched. */
+void
+cooperative_multitasking_update(void (*cb)(void *), void *arg,
+                                long long int last_run,
+                                long long int time_threshold)
+{
+    if (!cooperative_multitasking_callbacks) {
+        return;
+    }
+
+    struct cooperative_multitasking_callback *cm_entry;
+
+    HMAP_FOR_EACH_WITH_HASH (cm_entry, node,
+                             hash_pointer(arg ? arg : (void *) cb, 0),
+                             cooperative_multitasking_callbacks)
+    {
+        if (cm_entry->cb == cb && cm_entry->arg == arg) {
+            if (last_run) {
+                cm_entry->last_run = last_run;
+            }
+
+            if (time_threshold) {
+                cm_entry->time_threshold = time_threshold;
+            }
+            return;
+        }
+    }
+}
+
+static void
+cooperative_multitasking_yield_at__(const char *source_location)
+{
+    long long int now = time_msec();
+    struct cooperative_multitasking_callback *cm_entry;
+
+    HMAP_FOR_EACH (cm_entry, node, cooperative_multitasking_callbacks) {
+        long long int elapsed = now - cm_entry->last_run;
+
+        if (elapsed >= cm_entry->time_threshold) {
+            VLOG_DBG("yield called from %s: "
+                     "%lld: %lld >= %lld, executing %p(%p)",
+                     source_location, now, elapsed, cm_entry->time_threshold,
+                     cm_entry->cb, cm_entry->arg);
+            (*cm_entry->cb)(cm_entry->arg);
+            if (elapsed - cm_entry->time_threshold >
+                cm_entry->time_threshold / 8)
+            {
+                VLOG_WARN("yield threshold overrun with %lld msec. %s",
+                          elapsed - cm_entry->time_threshold,
+                          cm_entry->msg ? cm_entry->msg : "");
+                if (VLOG_IS_DBG_ENABLED()) {
+                    /* log_backtrace() logs at ERROR level but we only want to
+                     * log a backtrace when DEBUG is enabled */
+                    log_backtrace();
+                }
+            }
+        }
+    }
+}
+
+/* Iterate over registered callbacks and execute callbacks as demanded by the
+ * recorded time threshold. */
+void
+cooperative_multitasking_yield_at(const char *source_location)
+{
+    static bool yield_in_progress = false;
+
+    if (!cooperative_multitasking_callbacks) {
+        return;
+    }
+
+    if (yield_in_progress) {
+        VLOG_ERR_ONCE("nested yield avoided, this is a bug! "
+                      "enable debug logging for more details.");
+        if (VLOG_IS_DBG_ENABLED()) {
+            VLOG_DBG("nested yield, called from %s", source_location);
+            log_backtrace();
+        }
+        return;
+    }
+    yield_in_progress = true;
+
+    cooperative_multitasking_yield_at__(source_location);
+
+    yield_in_progress = false;
+}
diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h
new file mode 100644
index 000000000..6286bfbf5
--- /dev/null
+++ b/lib/cooperative-multitasking.h
@@ -0,0 +1,42 @@ 
+/*
+ * Copyright (c) 2023 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COOPERATIVE_MULTITASKING_H
+#define COOPERATIVE_MULTITASKING_H 1
+
+struct hmap;
+
+void cooperative_multitasking_init(struct hmap *);
+
+void cooperative_multitasking_register(void (*)(void *), void *,
+                                       long long int, const char *);
+#define COOPERATIVE_MULTITASKING_REGISTER(CB, ARG, TIME_THRESHOLD, MSG)       \
+    cooperative_multitasking_register((void (*)(void *)) CB, (void *) ARG,    \
+                                      TIME_THRESHOLD, MSG)
+
+void cooperative_multitasking_destroy(void);
+
+void cooperative_multitasking_update(void (*)(void *), void *, long long int,
+                                     long long int);
+#define COOPERATIVE_MULTITASKING_UPDATE(CB, ARG, LAST_RUN, TIME_THRESHOLD)    \
+    cooperative_multitasking_update((void (*) (void *)) CB, (void *) ARG,     \
+                                    LAST_RUN, TIME_THRESHOLD)
+
+void cooperative_multitasking_yield_at(const char *);
+#define cooperative_multitasking_yield() \
+    cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
+
+#endif /* COOPERATIVE_MULTITASKING_H */
diff --git a/tests/automake.mk b/tests/automake.mk
index 10c9fbb01..08c9b74d4 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -456,6 +456,7 @@  tests_ovstest_SOURCES = \
 	tests/test-ccmap.c \
 	tests/test-cmap.c \
 	tests/test-conntrack.c \
+	tests/test-cooperative-multitasking.c \
 	tests/test-csum.c \
 	tests/test-flows.c \
 	tests/test-hash.c \
diff --git a/tests/library.at b/tests/library.at
index 3f9df2f87..77d5abb01 100644
--- a/tests/library.at
+++ b/tests/library.at
@@ -296,3 +296,13 @@  AT_CLEANUP
 AT_SETUP([uuidset module])
 AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
 AT_CLEANUP
+
+AT_SETUP([cooperative-multitasking module])
+AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
+AT_CLEANUP
+
+AT_SETUP([cooperative-multitasking module nested yield detection])
+AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
+cooperative_multitasking|ERR|nested yield avoided, this is a bug! enable debug logging for more details.
+])
+AT_CLEANUP
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index 6eb758e22..88a9a9a27 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -2387,6 +2387,7 @@  m4_define([CLEAN_LOG_FILE],
   [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
    sed '/|poll_loop|/d' |   dnl
    sed '/|socket_util|/d' | dnl
+   sed '/|cooperative_multitasking|DBG|/d' | dnl
    sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
 
 CLEAN_LOG_FILE([1.log], [1.log.clear])
diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c
new file mode 100644
index 000000000..ec6be865f
--- /dev/null
+++ b/tests/test-cooperative-multitasking.c
@@ -0,0 +1,259 @@ 
+/*
+ * Copyright (c) 2023 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#undef NDEBUG
+#include "cooperative-multitasking.h"
+#include "cooperative-multitasking-private.h"
+#include "openvswitch/hmap.h"
+#include "ovstest.h"
+#include "timeval.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+
+static struct hmap cm_callbacks;
+
+struct fixture_arg {
+    bool called;
+};
+
+static void
+fixture_run(struct fixture_arg *arg)
+{
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, arg, time_msec(), 0);
+    if (arg) {
+        arg->called = true;
+    }
+}
+
+static void
+fixture_other_run(struct fixture_arg *arg)
+{
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_other_run, arg, time_msec(), 0);
+    if (arg) {
+        arg->called = true;
+    }
+}
+
+static void
+test_cm_register(void)
+{
+    struct cooperative_multitasking_callback *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_other_run, NULL, 3000, NULL);
+
+    ovs_assert(hmap_count(&cm_callbacks) == 3);
+
+    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
+        if (cm_entry->arg == (void *)&arg1) {
+            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
+            ovs_assert (cm_entry->time_threshold == 1000);
+            ovs_assert (cm_entry->last_run == now);
+        } else if (cm_entry->arg == (void *)&arg2) {
+            ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run);
+            ovs_assert (cm_entry->time_threshold == 2000);
+            ovs_assert (cm_entry->last_run == now);
+        } else if (cm_entry->cb == (void (*)(void *)) &fixture_other_run) {
+            ovs_assert (cm_entry->arg == NULL);
+            ovs_assert (cm_entry->time_threshold == 3000);
+            ovs_assert (cm_entry->last_run == now);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cm_update(void)
+{
+    struct cooperative_multitasking_callback *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    /* first register a couple of callbacks. */
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 0, NULL);
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 0, NULL);
+
+    ovs_assert(hmap_count(&cm_callbacks) == 2);
+
+    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
+        if (cm_entry->arg == (void *)&arg1) {
+            ovs_assert (cm_entry->time_threshold == 0);
+            ovs_assert (cm_entry->last_run == now);
+        } else if (cm_entry->arg == (void *)&arg2) {
+            ovs_assert (cm_entry->time_threshold == 0);
+            ovs_assert (cm_entry->last_run == now);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    /* update 'last_run' and 'time_threshold' for each callback and validate
+     * that the correct entry was actually updated. */
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 1, 2);
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 3, 4);
+
+    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
+        if (cm_entry->arg == (void *)&arg1) {
+            ovs_assert (cm_entry->time_threshold == 2);
+            ovs_assert (cm_entry->last_run == 1);
+        } else if (cm_entry->arg == (void *)&arg2) {
+            ovs_assert (cm_entry->time_threshold == 4);
+            ovs_assert (cm_entry->last_run == 3);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    /* confirm that providing 0 for 'last_run' or 'time_threshold' leaves the
+     * existing value untouched. */
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 0, 5);
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 6, 0);
+
+    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
+        if (cm_entry->arg == (void *)&arg1) {
+            ovs_assert (cm_entry->time_threshold == 5);
+            ovs_assert (cm_entry->last_run == 1);
+        } else if (cm_entry->arg == (void *)&arg2) {
+            ovs_assert (cm_entry->time_threshold == 4);
+            ovs_assert (cm_entry->last_run == 6);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cm_yield(void)
+{
+    struct cooperative_multitasking_callback *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    /* first register a couple of callbacks. */
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL);
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL);
+
+    ovs_assert(hmap_count(&cm_callbacks) == 2);
+
+    /* call to yield should not execute callbacks until time threshold. */
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == false);
+    ovs_assert(arg2.called == false);
+
+    HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) {
+        ovs_assert(cm_entry->last_run == now);
+    }
+
+    /* move clock forward and confirm the expected callbacks to be executed. */
+    timeval_warp(0, 1000);
+    timeval_stop();
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == true);
+    ovs_assert(arg2.called == false);
+
+    /* move clock forward and confirm the expected callbacks to be executed. */
+    arg1.called = arg2.called = false;
+    timeval_warp(0, 1000);
+    timeval_stop();
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == true);
+    ovs_assert(arg2.called == true);
+
+    timeval_warp(0, 1);
+    cooperative_multitasking_destroy();
+}
+
+static void
+fixture_buggy_run(struct fixture_arg *arg)
+{
+    COOPERATIVE_MULTITASKING_UPDATE(&fixture_buggy_run, arg, time_msec(), 0);
+    if (arg) {
+        arg->called = true;
+    }
+    /* A real run function MUST NOT directly or indirectly call yield, this is
+     * here to test the detection of such a programming error. */
+    cooperative_multitasking_yield();
+}
+
+static void
+test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
+{
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+
+    set_program_name(argv[0]);
+    vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
+    vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
+
+    time_msec(); /* ensure timeval is initialized */
+    timeval_timewarp_enable();
+
+    cooperative_multitasking_init(&cm_callbacks);
+
+    COOPERATIVE_MULTITASKING_REGISTER(&fixture_buggy_run, &arg1, 1000, NULL);
+    timeval_warp(0, 1000);
+    cooperative_multitasking_yield();
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
+{
+    time_msec(); /* ensure timeval is initialized */
+    timeval_timewarp_enable();
+
+    cooperative_multitasking_init(&cm_callbacks);
+
+    test_cm_register();
+    test_cm_update();
+    test_cm_yield();
+}
+
+OVSTEST_REGISTER("test-cooperative-multitasking",
+                 test_cooperative_multitasking);
+OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
+                 test_cooperative_multitasking_nested_yield);