diff mbox series

[ovs-dev,4/5] ovsdb-cs: Add ovsdb_cs_run_until() function.

Message ID 20250507134151.118615-5-martin.morgenstern@cloudandheat.com
State New
Delegated to: Ilya Maximets
Headers show
Series Timing API for jsonrpc, ovsdb-cs and ovsdb-idl. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Martin Morgenstern May 7, 2025, 1:41 p.m. UTC
Adds a timing based variant of ovsdb_cs_run() that accepts an additional
'deadline' parameter and a new coverage counter 'ovsdb_cs_run_blocked'
that tracks how often the call returned early because it was blocked
with EAGAIN.

ovsdb_cs_run_until() will process messages until the given deadline has
passed and allows clients to batch messages together.  In contrast to
ovsdb_cs_run(), it will not abort the batch if the underlying JSON layer
has already processed the message (e.g., answering an echo request).

Since both ovsdb_cs_run() and ovsdb_cs_run_until() perform the same
preparation steps, they were extracted into a dedicated function.

Signed-off-by: Martin Morgenstern <martin.morgenstern@cloudandheat.com>
---
 lib/ovsdb-cs.c | 82 ++++++++++++++++++++++++++++++++++++++++++--------
 lib/ovsdb-cs.h |  1 +
 2 files changed, 71 insertions(+), 12 deletions(-)
diff mbox series

Patch

diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c
index b5eda88ad..36ffd1ed9 100644
--- a/lib/ovsdb-cs.c
+++ b/lib/ovsdb-cs.c
@@ -20,6 +20,7 @@ 
 
 #include <errno.h>
 
+#include "coverage.h"
 #include "hash.h"
 #include "jsonrpc.h"
 #include "openvswitch/dynamic-string.h"
@@ -35,11 +36,14 @@ 
 #include "ovsdb-types.h"
 #include "sset.h"
 #include "svec.h"
+#include "timeval.h"
 #include "util.h"
 #include "uuid.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
 
+COVERAGE_DEFINE(ovsdb_cs_run_blocked);
+
 /* Connection state machine.
  *
  * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
@@ -603,19 +607,9 @@  ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
     return event;
 }
 
-/* Processes a batch of messages from the database server on 'cs'.  This may
- * cause the CS's contents to change.
- *
- * Initializes 'events' with a list of events that occurred on 'cs'.  The
- * caller must process and destroy all of the events. */
-void
-ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
+static void
+ovsdb_cs_run_prepare(struct ovsdb_cs *cs)
 {
-    ovs_list_init(events);
-    if (!cs->session) {
-        return;
-    }
-
     ovsdb_cs_send_cond_change(cs);
 
     jsonrpc_session_run(cs->session);
@@ -638,6 +632,22 @@  ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
                 ovsdb_cs_db_compose_lock_request(&cs->data));
         }
     }
+}
+
+/* Processes a batch of messages from the database server on 'cs'.  This may
+ * cause the CS's contents to change.
+ *
+ * Initializes 'events' with a list of events that occurred on 'cs'.  The
+ * caller must process and destroy all of the events. */
+void
+ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
+{
+    ovs_list_init(events);
+    if (!cs->session) {
+        return;
+    }
+
+    ovsdb_cs_run_prepare(cs);
 
     for (int i = 0; i < 50; i++) {
         struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
@@ -650,6 +660,54 @@  ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
     ovs_list_push_back_all(events, &cs->data.events);
 }
 
+/* Processes messages from the database server on 'cs' until the given
+ * deadline has passed.  This may cause the CS's contents to change.
+ *
+ * Returns 0 on success.
+ *
+ * Returns EINVAL if the cs session is invalid.
+ *
+ * Returns EAGAIN if the underlying jsonrpc layer blocks.
+ *
+ * Initializes 'events' with a list of events that occurred on 'cs'.  The
+ * caller must process and destroy all of the events. */
+int
+ovsdb_cs_run_until(struct ovsdb_cs *cs, struct ovs_list *events,
+                   long long deadline)
+{
+    ovs_list_init(events);
+    if (!cs->session) {
+        return EINVAL;
+    }
+
+    ovsdb_cs_run_prepare(cs);
+
+    int ret;
+    while (time_msec() < deadline) {
+        struct jsonrpc_msg *msg = NULL;
+        ret = jsonrpc_session_recv_until(cs->session, &msg, deadline);
+        if (ret == EAGAIN) {
+            COVERAGE_INC(ovsdb_cs_run_blocked);
+            break;
+        }
+        /* Even if we would not block we might not receive a message for two
+         * reasons:
+         *   1. We did not yet receive the message fully and stopped reading.
+         *   2. The message was already handled by the jsonrpc layer. */
+        if (msg) {
+            ovsdb_cs_process_msg(cs, msg);
+            jsonrpc_msg_destroy(msg);
+        }
+    }
+    ovs_list_push_back_all(events, &cs->data.events);
+
+    if (ret == EAGAIN) {
+        return EAGAIN;
+    }
+
+    return 0;
+}
+
 /* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
  * do or when activity occurs on a transaction on 'cs'. */
 void
diff --git a/lib/ovsdb-cs.h b/lib/ovsdb-cs.h
index bcc3dcd71..ff6438bdf 100644
--- a/lib/ovsdb-cs.h
+++ b/lib/ovsdb-cs.h
@@ -119,6 +119,7 @@  struct ovsdb_cs *ovsdb_cs_create(const char *database, int max_version,
 void ovsdb_cs_destroy(struct ovsdb_cs *);
 
 void ovsdb_cs_run(struct ovsdb_cs *, struct ovs_list *events);
+int ovsdb_cs_run_until(struct ovsdb_cs *, struct ovs_list *events, long long);
 void ovsdb_cs_wait(struct ovsdb_cs *);
 
 /* Network connection. */