@@ -42,6 +42,7 @@
#include "trigger.h"
#include "util.h"
#include "openvswitch/vlog.h"
+#include "random.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
@@ -60,7 +61,8 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
struct ovsdb *);
-static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
+static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *,
+ uint64_t limit);
static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
static void ovsdb_jsonrpc_session_get_memory_usage_all(
const struct ovsdb_jsonrpc_remote *, struct simap *usage);
@@ -128,6 +130,11 @@ struct ovsdb_jsonrpc_server {
bool read_only; /* This server is does not accept any
transactions that can modify the database. */
struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
+ struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where processing
+ should restart after a time
+ constraint interruption. */
+ bool must_wake_up; /* The processing loop must be re-run. It was
+ interrupted due to exceeding a time constraint. */
};
/* A configured remote. This is either a passive stream listener plus a list
@@ -140,6 +147,7 @@ struct ovsdb_jsonrpc_remote {
uint8_t dscp;
bool read_only;
char *role;
+ ssize_t count;
};
static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
@@ -293,6 +301,9 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
{
struct ovsdb_jsonrpc_remote *remote = node->data;
+ /* The safest option is to rerun all remotes. */
+ remote->server->skip_to = NULL;
+
ovsdb_jsonrpc_session_close_all(remote);
pstream_close(remote->listener);
shash_delete(&remote->server->remotes, node);
@@ -378,12 +389,26 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
}
void
-ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
+ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit)
{
struct shash_node *node;
+ uint64_t elapsed = 0;
+ uint64_t start_time = time_msec();
+
+ if (svr->must_wake_up) {
+ svr->must_wake_up = false;
+ }
SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;
+ if (svr->skip_to) {
+ if (remote != svr->skip_to) {
+ continue;
+ } else {
+ svr->skip_to = NULL;
+ svr->must_wake_up = true;
+ }
+ }
if (remote->listener) {
struct stream *stream;
@@ -403,7 +428,17 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
}
}
- ovsdb_jsonrpc_session_run_all(remote);
+ /* We assume accept and session creation time to be
+ * negligible for the purposes of computing timeouts.
+ */
+ ovsdb_jsonrpc_session_run_all(remote, limit - elapsed);
+
+ elapsed = time_msec() - start_time;
+ if (elapsed > limit) {
+ svr->must_wake_up = true;
+ svr->skip_to = remote;
+ break;
+ }
}
}
@@ -412,6 +447,16 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
{
struct shash_node *node;
+ if (svr->must_wake_up) {
+ /* We have stopped processing due to a time constraint.
+ * In this case there is no point to walk all sessions
+ * and rebuild the poll structure for the poll loop.
+ */
+ poll_immediate_wake();
+ svr->must_wake_up = false;
+ return;
+ }
+
SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;
@@ -570,6 +615,7 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
}
}
}
+
return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
}
@@ -583,15 +629,43 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
}
static void
-ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
+fast_forward_list(struct ovs_list *list, struct ovs_list *element)
+{
+ struct ovs_list temp = OVS_LIST_INITIALIZER(&temp);
+
+ if (ovs_list_is_short(list)) {
+ return;
+ }
+ if (list->prev == element || element == list) {
+ return;
+ }
+ /* Cut the "not yet processed" part out of the list and move it to
+ * temp.
+ */
+ ovs_list_splice(&temp, element, list->prev);
+ /* Push the processed part after the not processed. */
+ ovs_list_push_back_all(&temp, list);
+ /* Swap back the rearranged list. */
+ ovs_list_push_back_all(list, &temp);
+}
+
+static void
+ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
+ uint64_t limit)
{
struct ovsdb_jsonrpc_session *s, *next;
+ uint64_t start_time = time_msec();
LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
int error = ovsdb_jsonrpc_session_run(s);
if (error) {
ovsdb_jsonrpc_session_close(s);
}
+
+ if (time_msec() - start_time > limit) {
+ fast_forward_list(&remote->sessions, &next->node);
+ break;
+ }
}
}
@@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status(
void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool force,
char *comment);
-void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
+void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit);
void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
@@ -216,7 +216,21 @@ main_loop(struct server_config *config,
reconfigure_remotes(jsonrpc, all_dbs, remotes),
&remotes_error);
report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
- ovsdb_jsonrpc_server_run(jsonrpc);
+
+ /* Figure out current processing time limit. */
+ uint64_t limit = UINT64_MAX;
+ SHASH_FOR_EACH (node, all_dbs) {
+ struct db *db = node->data;
+ uint64_t db_limit;
+
+ db_limit = ovsdb_storage_max_processing_time(db->db->storage);
+ limit = MIN(db_limit, limit);
+ }
+ if (ovs_replay_is_active()) {
+ limit = UINT64_MAX;
+ }
+
+ ovsdb_jsonrpc_server_run(jsonrpc, limit);
if (*is_backup) {
replication_run();
@@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_)
}
}
+uint64_t
+raft_get_election_timer(const struct raft *raft)
+{
+ return raft->election_timer;
+}
+
static struct raft *
raft_alloc(void)
{
@@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *);
void raft_transfer_leadership(struct raft *, const char *reason);
const struct uuid *raft_current_eid(const struct raft *);
+
+uint64_t raft_get_election_timer(const struct raft *);
+
#endif /* lib/raft.h */
@@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
}
return raft_current_eid(storage->raft);
}
+
+uint64_t
+ovsdb_storage_max_processing_time(struct ovsdb_storage *storage)
+{
+ if (!storage->raft) {
+ return UINT64_MAX;
+ }
+ if (raft_get_election_timer(storage->raft) > 2) {
+ return raft_get_election_timer(storage->raft) / 2;
+ }
+ return 1;
+}
@@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct ovsdb_storage *);
const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *);
+uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *);
+
#endif /* ovsdb/storage.h */