@@ -20,6 +20,7 @@
#include <errno.h>
+#include "coverage.h"
#include "hash.h"
#include "jsonrpc.h"
#include "openvswitch/dynamic-string.h"
@@ -35,11 +36,14 @@
#include "ovsdb-types.h"
#include "sset.h"
#include "svec.h"
+#include "timeval.h"
#include "util.h"
#include "uuid.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
+COVERAGE_DEFINE(ovsdb_cs_run_blocked);
+
/* Connection state machine.
*
* When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
@@ -603,19 +607,9 @@ ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
return event;
}
-/* Processes a batch of messages from the database server on 'cs'. This may
- * cause the CS's contents to change.
- *
- * Initializes 'events' with a list of events that occurred on 'cs'. The
- * caller must process and destroy all of the events. */
-void
-ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
+static void
+ovsdb_cs_run_prepare(struct ovsdb_cs *cs)
{
- ovs_list_init(events);
- if (!cs->session) {
- return;
- }
-
ovsdb_cs_send_cond_change(cs);
jsonrpc_session_run(cs->session);
@@ -638,6 +632,22 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
ovsdb_cs_db_compose_lock_request(&cs->data));
}
}
+}
+
+/* Processes a batch of messages from the database server on 'cs'. This may
+ * cause the CS's contents to change.
+ *
+ * Initializes 'events' with a list of events that occurred on 'cs'. The
+ * caller must process and destroy all of the events. */
+void
+ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
+{
+ ovs_list_init(events);
+ if (!cs->session) {
+ return;
+ }
+
+ ovsdb_cs_run_prepare(cs);
for (int i = 0; i < 50; i++) {
struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
@@ -650,6 +660,54 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
ovs_list_push_back_all(events, &cs->data.events);
}
+/* Processes messages from the database server on 'cs' until the given
+ * deadline has passed. This may cause the CS's contents to change.
+ *
+ * Returns 0 on success.
+ *
+ * Returns EINVAL if the cs session is invalid.
+ *
+ * Returns EAGAIN if the underlying jsonrpc layer blocks.
+ *
+ * Initializes 'events' with a list of events that occurred on 'cs'. The
+ * caller must process and destroy all of the events. */
+int
+ovsdb_cs_run_until(struct ovsdb_cs *cs, struct ovs_list *events,
+ long long deadline)
+{
+ ovs_list_init(events);
+ if (!cs->session) {
+ return EINVAL;
+ }
+
+ ovsdb_cs_run_prepare(cs);
+
+ int ret;
+ while (time_msec() < deadline) {
+ struct jsonrpc_msg *msg = NULL;
+ ret = jsonrpc_session_recv_until(cs->session, &msg, deadline);
+ if (ret == EAGAIN) {
+ COVERAGE_INC(ovsdb_cs_run_blocked);
+ break;
+ }
+ /* Even if we would not block we might not receive a message for two
+ * reasons:
+ * 1. We did not yet receive the message fully and stopped reading.
+ * 2. The message was already handled by the jsonrpc layer. */
+ if (msg) {
+ ovsdb_cs_process_msg(cs, msg);
+ jsonrpc_msg_destroy(msg);
+ }
+ }
+ ovs_list_push_back_all(events, &cs->data.events);
+
+ if (ret == EAGAIN) {
+ return EAGAIN;
+ }
+
+ return 0;
+}
+
/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
* do or when activity occurs on a transaction on 'cs'. */
void
@@ -119,6 +119,7 @@ struct ovsdb_cs *ovsdb_cs_create(const char *database, int max_version,
void ovsdb_cs_destroy(struct ovsdb_cs *);
void ovsdb_cs_run(struct ovsdb_cs *, struct ovs_list *events);
+int ovsdb_cs_run_until(struct ovsdb_cs *, struct ovs_list *events, long long);
void ovsdb_cs_wait(struct ovsdb_cs *);
/* Network connection. */
Adds a timing based variant of ovsdb_cs_run() that accepts an additional 'deadline' parameter and a new coverage counter 'ovsdb_cs_run_blocked' that tracks how often the call returned early because it was blocked with EAGAIN. ovsdb_cs_run_until() will process messages until the given deadline has passed and allows clients to batch messages together. In contrast to ovsdb_cs_run(), it will not abort the batch if the underlying JSON layer has already processed the message (e.g., answering an echo request). Since both ovsdb_cs_run() and ovsdb_cs_run_until() perform the same preparation steps, they were extracted into a dedicated function. Signed-off-by: Martin Morgenstern <martin.morgenstern@cloudandheat.com> --- lib/ovsdb-cs.c | 82 ++++++++++++++++++++++++++++++++++++++++++-------- lib/ovsdb-cs.h | 1 + 2 files changed, 71 insertions(+), 12 deletions(-)