diff mbox series

[ovs-dev,v3,2/5] lib: Introduce cooperative multitasking module.

Message ID 20240116225205.38112-3-frode.nordahl@canonical.com
State Accepted
Commit 3c8a4e942d6a15faa44f2e283d436bbf57c5b841
Delegated to: Ilya Maximets
Headers show
Series Introduce cooperative multitasking to improve OVSDB RAFT cluster operation. | expand

Checks

Context Check Description
ovsrobot/intel-ovs-compilation fail test: fail
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Frode Nordahl Jan. 16, 2024, 10:52 p.m. UTC
One of the goals of Open vSwitch is to be as resource efficient as
possible.  Core parts of the program has been implemented as
asynchronous state machines, and when absolutely necessary
additional threads are used.

Introduce cooperative multitasking module which allow us to
interleave important processing with long running tasks while
avoiding the additional resource consumption of threads and
complexity of asynchronous state machines.

We will use this module to ensure long running processing in the
OVSDB server does not interfere with stable maintenance of the
RAFT cluster in subsequent patches.

Suggested-by: Ilya Maximets <i.maximets@ovn.org>
Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
---
 lib/automake.mk                        |   3 +
 lib/cooperative-multitasking-private.h |  33 +++
 lib/cooperative-multitasking.c         | 157 +++++++++++++
 lib/cooperative-multitasking.h         | 115 +++++++++
 tests/automake.mk                      |   1 +
 tests/library.at                       |  10 +
 tests/ovsdb-server.at                  |   1 +
 tests/test-cooperative-multitasking.c  | 307 +++++++++++++++++++++++++
 8 files changed, 627 insertions(+)
 create mode 100644 lib/cooperative-multitasking-private.h
 create mode 100644 lib/cooperative-multitasking.c
 create mode 100644 lib/cooperative-multitasking.h
 create mode 100644 tests/test-cooperative-multitasking.c
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 0dc8a35cc..8596171c6 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -94,6 +94,9 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/conntrack-other.c \
 	lib/conntrack.c \
 	lib/conntrack.h \
+	lib/cooperative-multitasking.c \
+	lib/cooperative-multitasking.h \
+	lib/cooperative-multitasking-private.h \
 	lib/coverage.c \
 	lib/coverage.h \
 	lib/cpu.c \
diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h
new file mode 100644
index 000000000..cb8382377
--- /dev/null
+++ b/lib/cooperative-multitasking-private.h
@@ -0,0 +1,33 @@ 
+/*
+ * Copyright (c) 2024 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
+#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
+
+#include "openvswitch/hmap.h"
+
+extern struct hmap cooperative_multitasking_callbacks;
+
+struct cm_entry {
+    struct hmap_node node;
+    void (*cb)(void *);
+    void *arg;
+    long long int threshold;
+    long long int last_run;
+    const char *name;
+};
+
+#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */
diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c
new file mode 100644
index 000000000..3a91af26f
--- /dev/null
+++ b/lib/cooperative-multitasking.c
@@ -0,0 +1,157 @@ 
+/*
+ * Copyright (c) 2024 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "backtrace.h"
+#include "cooperative-multitasking-private.h"
+#include "cooperative-multitasking.h"
+#include "hash.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/vlog.h"
+#include "timeval.h"
+
+VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
+
+struct hmap cooperative_multitasking_callbacks = HMAP_INITIALIZER(
+    &cooperative_multitasking_callbacks);
+
+/* Free any data allocated by calls to cooperative_multitasking_set(). */
+void
+cooperative_multitasking_destroy(void)
+{
+    struct cm_entry *cm_entry;
+    HMAP_FOR_EACH_SAFE (cm_entry, node, &cooperative_multitasking_callbacks) {
+        hmap_remove(&cooperative_multitasking_callbacks, &cm_entry->node);
+        free(cm_entry);
+    }
+}
+
+/* Set/update callback as identified by 'cb' and 'arg'.
+ *
+ * 'name' is used for logging events related to this callback.
+ *
+ * The value for 'last_run' must be updated each time the callback is run.
+ *
+ * Updating the value for 'threshold' may be necessary as a consequence of
+ * change in runtime configuration or requirements of the part of the program
+ * serviced by the callback.
+ *
+ * Providing a value of 0 for 'last_run' or 'threshold' will leave the stored
+ * value untouched. */
+void
+cooperative_multitasking_set(void (*cb)(void *), void *arg,
+                             long long int last_run, long long int threshold,
+                             const char *name)
+{
+    struct cm_entry *cm_entry;
+
+    HMAP_FOR_EACH_WITH_HASH (cm_entry, node, hash_pointer((void *) cb, 0),
+                             &cooperative_multitasking_callbacks) {
+        if (cm_entry->cb == cb && cm_entry->arg == arg) {
+            if (last_run) {
+                cm_entry->last_run = last_run;
+            }
+
+            if (threshold) {
+                cm_entry->threshold = threshold;
+            }
+            return;
+        }
+    }
+
+    cm_entry = xzalloc(sizeof *cm_entry);
+    cm_entry->cb = cb;
+    cm_entry->arg = arg;
+    cm_entry->threshold = threshold;
+    cm_entry->last_run = last_run ? last_run : time_msec();
+    cm_entry->name = name;
+
+    hmap_insert(&cooperative_multitasking_callbacks,
+                &cm_entry->node, hash_pointer((void *) cm_entry->cb, 0));
+}
+
+/* Remove callback identified by 'cb' and 'arg'. */
+void
+cooperative_multitasking_remove(void (*cb)(void *), void *arg)
+{
+    struct cm_entry *cm_entry;
+
+    HMAP_FOR_EACH_WITH_HASH (cm_entry, node, hash_pointer((void *) cb, 0),
+                             &cooperative_multitasking_callbacks) {
+        if (cm_entry->cb == cb && cm_entry->arg == arg) {
+            hmap_remove(&cooperative_multitasking_callbacks, &cm_entry->node);
+            free(cm_entry);
+            return;
+        }
+    }
+}
+
+static void
+cooperative_multitasking_yield_at__(const char *source_location)
+{
+    long long int start = time_msec();
+    struct cm_entry *cm_entry;
+    long long int elapsed;
+    bool warn;
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        elapsed = time_msec() - cm_entry->last_run;
+
+        if (elapsed >= cm_entry->threshold) {
+            warn = elapsed - cm_entry->threshold > cm_entry->threshold / 8;
+
+            VLOG(warn ? VLL_WARN : VLL_DBG, "%s: yield for %s(%p): "
+                 "elapsed(%lld) >= threshold(%lld), overrun: %lld",
+                 source_location, cm_entry->name, cm_entry->arg, elapsed,
+                 cm_entry->threshold, elapsed - cm_entry->threshold);
+
+            if (warn && VLOG_IS_DBG_ENABLED()) {
+                log_backtrace();
+            }
+
+            (*cm_entry->cb)(cm_entry->arg);
+        }
+    }
+
+    elapsed = time_msec() - start;
+    if (elapsed > 1000) {
+        VLOG_WARN("Unreasonably long %lldms runtime for callbacks.", elapsed);
+    }
+}
+
+/* Iterate over registered callbacks and execute callbacks as demanded by the
+ * recorded time threshold. */
+void
+cooperative_multitasking_yield_at(const char *source_location)
+{
+    static bool yield_in_progress = false;
+
+    if (yield_in_progress) {
+        VLOG_ERR_ONCE("Nested yield avoided, this is a bug! "
+                      "Enable debug logging for more details.");
+        if (VLOG_IS_DBG_ENABLED()) {
+            VLOG_DBG("%s: nested yield.", source_location);
+            log_backtrace();
+        }
+        return;
+    }
+    yield_in_progress = true;
+
+    cooperative_multitasking_yield_at__(source_location);
+
+    yield_in_progress = false;
+}
diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h
new file mode 100644
index 000000000..1bbb59944
--- /dev/null
+++ b/lib/cooperative-multitasking.h
@@ -0,0 +1,115 @@ 
+/*
+ * Copyright (c) 2024 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COOPERATIVE_MULTITASKING_H
+#define COOPERATIVE_MULTITASKING_H 1
+
+/*
+ * cooperative-multitasking, interleaved execution for Open vSwitch.
+ *
+ * Overview
+ * ========
+ *
+ * One of the goals of Open vSwitch is to be as resource efficient as
+ * possible.  Core parts of the program has been implemented as asynchronous
+ * state machines, and when absolutely necessary additional threads are used.
+ *
+ * Modules with mostly synchronous and single threaded code that are expected
+ * to have heavy processing, can make use of the cooperative-multitasking
+ * interface to yield to modules that have registered callbacks at a time
+ * threshold.
+ *
+ * Typical Usage
+ * =============
+ *
+ * The module that provides the callback typically has a run() function that is
+ * already part of the main processing loop and can then register like this:
+ *
+ * static void my_run_cb(void *arg);
+ *
+ * static void
+ * my_run(struct data *my_data)
+ * {
+ *     ...
+ *
+ *     cooperative_multitasking_set(&my_run_cb, (void *) my_data,
+ *                                  time_msec(), 1000, "my_run");
+ * }
+ *
+ * static void
+ * my_run_cb (void *arg)
+ * {
+ *     struct data *my_data = (struct data *) arg;
+ *
+ *     my_run(my_data);
+ * }
+ *
+ * static void
+ * my_destroy(struct data *my_data)
+ * {
+ *     ...
+ *
+ *     cooperatrive_multitasking_remove(&my_run_cb, (void *) my_data);
+ * }
+ *
+ * The module that is expected to have heavy processing can yield like this:
+ *
+ * HMAP_FOR_EACH (row, hmap_node, &src_table->rows) {
+ *     cooperative_multitasking_yield();
+ *
+ *     ...
+ * }
+ *
+ * Rules for implementation
+ * ========================
+ *
+ * - The module that registers itself with a callback must not use the yield
+ *   functionality inside nor should it be possible to do so via calls to other
+ *   modules.
+ *
+ * - The module that registers the callback should be self-sufficient, i.e.
+ *   the internal state of that module should not matter to the outside world,
+ *   at least it should not matter for the call stack that enters the
+ *   cooperative_multitasking_yield().
+ *
+ * - cooperative_multitasking_yield() must not be called from places that can
+ *   loop indefinitely, only in places that eventually end, otherwise it may
+ *   give a false impression that the server is working fine while it is stuck
+ *   and not actually doing any useful work.
+ *
+ * Thread-safety
+ * =============
+ *
+ * The cooperative-multitasking module and functions therein are not thread
+ * safe and must only be used by one thread.
+ */
+
+struct hmap;
+
+void cooperative_multitasking_destroy(void);
+
+void cooperative_multitasking_set(void (*cb)(void *), void *arg,
+                                  long long int last_run,
+                                  long long int threshold,
+                                  const char *name);
+
+void cooperative_multitasking_remove(void (*cb)(void *), void *arg);
+
+void cooperative_multitasking_yield_at(const char *source_location);
+#define cooperative_multitasking_yield() \
+    cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
+
+#endif /* COOPERATIVE_MULTITASKING_H */
diff --git a/tests/automake.mk b/tests/automake.mk
index 10c9fbb01..08c9b74d4 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -456,6 +456,7 @@  tests_ovstest_SOURCES = \
 	tests/test-ccmap.c \
 	tests/test-cmap.c \
 	tests/test-conntrack.c \
+	tests/test-cooperative-multitasking.c \
 	tests/test-csum.c \
 	tests/test-flows.c \
 	tests/test-hash.c \
diff --git a/tests/library.at b/tests/library.at
index 3f9df2f87..7b4acebb8 100644
--- a/tests/library.at
+++ b/tests/library.at
@@ -296,3 +296,13 @@  AT_CLEANUP
 AT_SETUP([uuidset module])
 AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
 AT_CLEANUP
+
+AT_SETUP([cooperative-multitasking module])
+AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
+AT_CLEANUP
+
+AT_SETUP([cooperative-multitasking module nested yield detection])
+AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
+cooperative_multitasking|ERR|Nested yield avoided, this is a bug! Enable debug logging for more details.
+])
+AT_CLEANUP
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index 036e4cc3b..e152a153c 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -2845,6 +2845,7 @@  m4_define([CLEAN_LOG_FILE],
   [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
    sed '/|poll_loop|/d' |   dnl
    sed '/|socket_util|/d' | dnl
+   sed '/|cooperative_multitasking|DBG|/d' | dnl
    sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
 
 CLEAN_LOG_FILE([1.log], [1.log.clear])
diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c
new file mode 100644
index 000000000..f7407bb03
--- /dev/null
+++ b/tests/test-cooperative-multitasking.c
@@ -0,0 +1,307 @@ 
+/*
+ * Copyright (c) 2023 Canonical Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#undef NDEBUG
+#include "cooperative-multitasking.h"
+#include "cooperative-multitasking-private.h"
+#include "openvswitch/hmap.h"
+#include "ovstest.h"
+#include "timeval.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+
+struct fixture_arg {
+    bool called;
+};
+
+static void fixture_run_wrap(void *arg);
+
+#define FIXTURE_RUN_NAME "fixture_run"
+
+static void
+fixture_run(struct fixture_arg *arg)
+{
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) arg,
+                                 time_msec(), 0, FIXTURE_RUN_NAME);
+    if (arg) {
+        arg->called = true;
+    }
+}
+
+static void
+fixture_run_wrap(void *arg)
+{
+    struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
+
+    fixture_run(fixture_arg);
+}
+
+
+static void fixture_other_run_wrap(void *arg);
+
+#define FIXTURE_OTHER_RUN_NAME "fixture_other_run"
+
+static void
+fixture_other_run(struct fixture_arg *arg)
+{
+    cooperative_multitasking_set(&fixture_other_run_wrap, (void *) arg,
+                                 time_msec(), 0, FIXTURE_OTHER_RUN_NAME);
+    if (arg) {
+        arg->called = true;
+    }
+}
+
+static void
+fixture_other_run_wrap(void *arg)
+{
+    struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
+
+    fixture_other_run(fixture_arg);
+}
+
+static void
+test_cm_set_registration(void)
+{
+    struct cm_entry *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 1000,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 2000,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_other_run_wrap, NULL, 0, 3000,
+                                 FIXTURE_OTHER_RUN_NAME);
+
+    ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 3);
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        if (cm_entry->arg == (void *) &arg1) {
+            ovs_assert(cm_entry->cb == &fixture_run_wrap);
+            ovs_assert(cm_entry->threshold == 1000);
+            ovs_assert(cm_entry->last_run == now);
+        } else if (cm_entry->arg == (void *) &arg2) {
+            ovs_assert(cm_entry->cb == &fixture_run_wrap);
+            ovs_assert(cm_entry->threshold == 2000);
+            ovs_assert(cm_entry->last_run == now);
+        } else if (cm_entry->cb == &fixture_other_run_wrap) {
+            ovs_assert(cm_entry->arg == NULL);
+            ovs_assert(cm_entry->threshold == 3000);
+            ovs_assert(cm_entry->last_run == now);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    cooperative_multitasking_remove(&fixture_other_run_wrap, NULL);
+    ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
+    cooperative_multitasking_remove(&fixture_run_wrap, (void *) &arg2);
+    ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 1);
+
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cm_set_update(void)
+{
+    struct cm_entry *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    /* First register a couple of callbacks. */
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 0,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 0,
+                                 FIXTURE_RUN_NAME);
+
+    ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        if (cm_entry->arg == (void *) &arg1) {
+            ovs_assert(cm_entry->threshold == 0);
+            ovs_assert(cm_entry->last_run == now);
+        } else if (cm_entry->arg == (void *) &arg2) {
+            ovs_assert(cm_entry->threshold == 0);
+            ovs_assert(cm_entry->last_run == now);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    /* Update 'last_run' and 'threshold' for each callback and validate
+     * that the correct entry was actually updated. */
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 1, 2,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 3, 4,
+                                 FIXTURE_RUN_NAME);
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        if (cm_entry->arg == (void *) &arg1) {
+            ovs_assert(cm_entry->threshold == 2);
+            ovs_assert(cm_entry->last_run == 1);
+        } else if (cm_entry->arg == (void *) &arg2) {
+            ovs_assert(cm_entry->threshold == 4);
+            ovs_assert(cm_entry->last_run == 3);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    /* Confirm that providing 0 for 'last_run' or 'threshold' leaves the
+     * existing value untouched. */
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 5,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 6, 0,
+                                 FIXTURE_RUN_NAME);
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        if (cm_entry->arg == (void *) &arg1) {
+            ovs_assert(cm_entry->threshold == 5);
+            ovs_assert(cm_entry->last_run == 1);
+        } else if (cm_entry->arg == (void *) &arg2) {
+            ovs_assert(cm_entry->threshold == 4);
+            ovs_assert(cm_entry->last_run == 6);
+        } else {
+            OVS_NOT_REACHED();
+        }
+    }
+
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cm_yield(void)
+{
+    struct cm_entry *cm_entry;
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+    struct fixture_arg arg2 = {
+        .called = false,
+    };
+
+    timeval_stop();
+    long long int now = time_msec();
+
+    /* First register a couple of callbacks. */
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 1000,
+                                 FIXTURE_RUN_NAME);
+    cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 2000,
+                                 FIXTURE_RUN_NAME);
+
+    ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
+
+    /* Call to yield should not execute callbacks until time threshold. */
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == false);
+    ovs_assert(arg2.called == false);
+
+    HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
+        ovs_assert(cm_entry->last_run == now);
+    }
+
+    /* Move clock forward and confirm the expected callbacks to be executed. */
+    timeval_warp(1000);
+    timeval_stop();
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == true);
+    ovs_assert(arg2.called == false);
+
+    /* Move clock forward and confirm the expected callbacks to be executed. */
+    arg1.called = arg2.called = false;
+    timeval_warp(1000);
+    timeval_stop();
+    cooperative_multitasking_yield();
+    ovs_assert(arg1.called == true);
+    ovs_assert(arg2.called == true);
+
+    cooperative_multitasking_destroy();
+}
+
+static void fixture_buggy_run_wrap(void *arg);
+
+#define FIXTURE_BUGGY_RUN_NAME "fixture_buggy_run"
+
+static void
+fixture_buggy_run(struct fixture_arg *arg)
+{
+    cooperative_multitasking_set(&fixture_buggy_run_wrap, (void *) arg,
+                                 time_msec(), 0, FIXTURE_BUGGY_RUN_NAME);
+    if (arg) {
+        arg->called = true;
+    }
+    /* A real run function MUST NOT directly or indirectly call yield, this is
+     * here to test the detection of such a programming error. */
+    cooperative_multitasking_yield();
+}
+
+static void
+fixture_buggy_run_wrap(void *arg)
+{
+    struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
+
+    fixture_buggy_run(fixture_arg);
+}
+
+static void
+test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
+{
+    struct fixture_arg arg1 = {
+        .called = false,
+    };
+
+    set_program_name(argv[0]);
+    vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
+    vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
+
+    time_msec(); /* Ensure timeval is initialized. */
+
+    cooperative_multitasking_set(&fixture_buggy_run_wrap, (void *) &arg1,
+                                 0, 1000, FIXTURE_BUGGY_RUN_NAME);
+    timeval_warp(1000);
+    cooperative_multitasking_yield();
+    cooperative_multitasking_destroy();
+}
+
+static void
+test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
+{
+    time_msec(); /* Ensure timeval is initialized. */
+
+    test_cm_set_registration();
+    test_cm_set_update();
+    test_cm_yield();
+}
+
+OVSTEST_REGISTER("test-cooperative-multitasking",
+                 test_cooperative_multitasking);
+OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
+                 test_cooperative_multitasking_nested_yield);