Message ID | 20210722150344.11713-1-anton.ivanov@cambridgegreys.com |
---|---|
State | Changes Requested |
Headers | show |
Series | [ovs-dev,v5] ovsdb: provide raft and command interfaces with priority | expand |
Context | Check | Description |
---|---|---|
ovsrobot/apply-robot | success | apply and check: success |
ovsrobot/github-robot | fail | github build: failed |
On Thu, 2021-07-22 at 16:03 +0100, anton.ivanov@cambridgegreys.com wrote: > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > Set a soft time limit of "raft election timer"/2 on ovsdb > processing. > > This improves behaviour in large heavily loaded clusters. > While it cannot fully eliminate spurious raft elections > under heavy load, it significantly decreases their number. > > Processing is (to the extent possible) restarted where it > stopped on the previous iteration to ensure that sessions > towards the tail of the session list are not starved. > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> > --- > ovsdb/jsonrpc-server.c | 92 > ++++++++++++++++++++++++++++++++++++++++-- > ovsdb/jsonrpc-server.h | 2 +- > ovsdb/ovsdb-server.c | 23 ++++++++++- > ovsdb/raft.c | 6 +++ > ovsdb/raft.h | 3 ++ > ovsdb/storage.c | 12 ++++++ > ovsdb/storage.h | 2 + > 7 files changed, 134 insertions(+), 6 deletions(-) > > diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c > index 351c39d8a..13c437811 100644 > --- a/ovsdb/jsonrpc-server.c > +++ b/ovsdb/jsonrpc-server.c > @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session > *ovsdb_jsonrpc_session_create( > struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool); > static void ovsdb_jsonrpc_session_preremove_db(struct > ovsdb_jsonrpc_remote *, > struct ovsdb *); > -static void ovsdb_jsonrpc_session_run_all(struct > ovsdb_jsonrpc_remote *); > +static void ovsdb_jsonrpc_session_run_all(struct > ovsdb_jsonrpc_remote *, > + uint64_t limit); > static void ovsdb_jsonrpc_session_wait_all(struct > ovsdb_jsonrpc_remote *); > static void ovsdb_jsonrpc_session_get_memory_usage_all( > const struct ovsdb_jsonrpc_remote *, struct simap *usage); > @@ -128,6 +129,11 @@ struct ovsdb_jsonrpc_server { > bool read_only; /* This server is does not accept any > transactions that can modify the > database. */ > struct shash remotes; /* Contains "struct > ovsdb_jsonrpc_remote *"s. */ > + struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where > processing > + should restart after a > time > + constraint > interruption. */ > + bool must_wake_up; /* The processing loop must be re-run. It was > + interrupted due to exceeding a time > constraint. */ > }; > > /* A configured remote. This is either a passive stream listener > plus a list > @@ -137,6 +143,9 @@ struct ovsdb_jsonrpc_remote { > struct ovsdb_jsonrpc_server *server; > struct pstream *listener; /* Listener, if passive. */ > struct ovs_list sessions; /* List of "struct > ovsdb_jsonrpc_session"s. */ > + struct ovsdb_jsonrpc_session *skip_to; /* Session at which > processing > + should restart after > an > + interruption. */ > uint8_t dscp; > bool read_only; > char *role; > @@ -279,6 +288,7 @@ ovsdb_jsonrpc_server_add_remote(struct > ovsdb_jsonrpc_server *svr, > remote->dscp = options->dscp; > remote->read_only = options->read_only; > remote->role = nullable_xstrdup(options->role); > + remote->skip_to = NULL; > shash_add(&svr->remotes, name, remote); > > if (!listener) { > @@ -293,6 +303,11 @@ ovsdb_jsonrpc_server_del_remote(struct > shash_node *node) > { > struct ovsdb_jsonrpc_remote *remote = node->data; > > + /* safest option - rerun all remotes */ > + if (remote->server->skip_to) { > + remote->server->skip_to = NULL; > + } > + I think Dumitru said this too, but shouldn't this just be: remote->server->skip_to = NULL; and completely skip the if()? Dan > ovsdb_jsonrpc_session_close_all(remote); > pstream_close(remote->listener); > shash_delete(&remote->server->remotes, node); > @@ -378,12 +393,24 @@ ovsdb_jsonrpc_server_set_read_only(struct > ovsdb_jsonrpc_server *svr, > } > > void > -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) > +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t > limit) > { > struct shash_node *node; > + uint64_t elapsed = 0; > + uint64_t start_time = time_msec(); > + > + svr->must_wake_up = false; > > SHASH_FOR_EACH (node, &svr->remotes) { > struct ovsdb_jsonrpc_remote *remote = node->data; > + if (svr->skip_to) { > + if (remote != svr->skip_to) { > + continue; > + } else { > + svr->skip_to = NULL; > + svr->must_wake_up = true; > + } > + } > > if (remote->listener) { > struct stream *stream; > @@ -403,7 +430,17 @@ ovsdb_jsonrpc_server_run(struct > ovsdb_jsonrpc_server *svr) > } > } > > - ovsdb_jsonrpc_session_run_all(remote); > + /* We assume accept and session creation time to be > + * negligible for the purposes of computing timeouts. > + */ > + ovsdb_jsonrpc_session_run_all(remote, limit - elapsed); > + > + elapsed = time_msec() - start_time; > + if (elapsed > limit) { > + svr->must_wake_up = true; > + svr->skip_to = remote; > + break; > + } > } > } > > @@ -412,6 +449,16 @@ ovsdb_jsonrpc_server_wait(struct > ovsdb_jsonrpc_server *svr) > { > struct shash_node *node; > > + if (svr->must_wake_up) { > + /* We have stopped processing due to a time constraint. > + * In this case there is no point to walk all sessions > + * and rebuild the poll structure for the poll loop. > + */ > + poll_immediate_wake(); > + svr->must_wake_up = false; > + return; > + } > + > SHASH_FOR_EACH (node, &svr->remotes) { > struct ovsdb_jsonrpc_remote *remote = node->data; > > @@ -583,15 +630,52 @@ ovsdb_jsonrpc_session_set_options(struct > ovsdb_jsonrpc_session *session, > } > > static void > -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) > +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote, > + uint64_t limit) > { > struct ovsdb_jsonrpc_session *s, *next; > + uint64_t start_time = time_msec(); > > LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { > + if (remote->skip_to && s != remote->skip_to) { > + /* Processing was interrupted, we skip to the point > + * where we had to interrupt it. > + * We cannot use the _CONTINUE macro as it is not safe > + * if the list has been changed in the meantime. > + */ > + continue; > + } > + > + /* Set ->next as skip point if we need to restart > processing. > + * This way, even if current is removed, we always have a > + * valid pointer to continue processing. > + */ > + remote->skip_to = > + CONTAINER_OF(ovs_list_front(&s->node), > + struct ovsdb_jsonrpc_session, node); > + > + if (remote->skip_to == s) { > + /* The list is a singleton. This is a special case, > where > + * deleting the node will invalidate the pointer. > + */ > + remote->skip_to = NULL; > + } > + > int error = ovsdb_jsonrpc_session_run(s); > if (error) { > ovsdb_jsonrpc_session_close(s); > } > + > + if (time_msec() - start_time > limit) { > + /* We bail leaving skip_to set. Next processing > iteration > + * will skip everything up to skip_to which was set to > + * ->next earlier on. > + */ > + break; > + } else { > + /* We are within time constraints, clear skip_to. */ > + remote->skip_to = NULL; > + } > } > } > > diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h > index e0653aa39..218152e9d 100644 > --- a/ovsdb/jsonrpc-server.h > +++ b/ovsdb/jsonrpc-server.h > @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status( > void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, > bool force, > char *comment); > > -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *); > +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, > uint64_t limit); > void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *); > > void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server > *, > diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c > index 0b3d2bb71..3fcedf617 100644 > --- a/ovsdb/ovsdb-server.c > +++ b/ovsdb/ovsdb-server.c > @@ -216,7 +216,28 @@ main_loop(struct server_config *config, > reconfigure_remotes(jsonrpc, all_dbs, remotes), > &remotes_error); > report_error_if_changed(reconfigure_ssl(all_dbs), > &ssl_error); > - ovsdb_jsonrpc_server_run(jsonrpc); > + > + /* Figure out current processing time limit. */ > + > + bool first_db = true; > + uint64_t limit = UINT64_MAX; > + SHASH_FOR_EACH (node, all_dbs) { > + struct db *db = node->data; > + uint64_t db_limit; > + > + db_limit = ovsdb_storage_max_processing_time(db->db- > >storage); > + if (first_db) { > + /* reset the limit */ > + limit = db_limit; > + first_db = false; > + } > + limit = MIN(db_limit, limit); > + } > + if (ovs_replay_is_active()) { > + limit = UINT64_MAX; > + } > + > + ovsdb_jsonrpc_server_run(jsonrpc, limit); > > if (*is_backup) { > replication_run(); > diff --git a/ovsdb/raft.c b/ovsdb/raft.c > index 2fb515651..183463aba 100644 > --- a/ovsdb/raft.c > +++ b/ovsdb/raft.c > @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_) > } > } > > +uint64_t > +raft_get_election_timer(const struct raft *raft) > +{ > + return raft->election_timer; > +} > + > static struct raft * > raft_alloc(void) > { > diff --git a/ovsdb/raft.h b/ovsdb/raft.h > index 3545c41c2..575e7f609 100644 > --- a/ovsdb/raft.h > +++ b/ovsdb/raft.h > @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *); > void raft_transfer_leadership(struct raft *, const char *reason); > > const struct uuid *raft_current_eid(const struct raft *); > + > +uint64_t raft_get_election_timer(const struct raft *); > + > #endif /* lib/raft.h */ > diff --git a/ovsdb/storage.c b/ovsdb/storage.c > index d727b1eac..58018fe6d 100644 > --- a/ovsdb/storage.c > +++ b/ovsdb/storage.c > @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage > *storage) > } > return raft_current_eid(storage->raft); > } > + > +uint64_t > +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage) > +{ > + if (!storage->raft) { > + return UINT64_MAX; > + } > + if (raft_get_election_timer(storage->raft) > 2) { > + return raft_get_election_timer(storage->raft) / 2; > + } > + return 1; > +} > diff --git a/ovsdb/storage.h b/ovsdb/storage.h > index e120094d7..a8a02e0bd 100644 > --- a/ovsdb/storage.h > +++ b/ovsdb/storage.h > @@ -97,4 +97,6 @@ struct ovsdb_schema > *ovsdb_storage_read_schema(struct ovsdb_storage *); > > const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage > *); > > +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *); > + > #endif /* ovsdb/storage.h */
Addressed in v6, apologies for missing it. Brgds, On 22/07/2021 16:22, Dan Williams wrote: > On Thu, 2021-07-22 at 16:03 +0100, anton.ivanov@cambridgegreys.com > wrote: >> From: Anton Ivanov <anton.ivanov@cambridgegreys.com> >> >> Set a soft time limit of "raft election timer"/2 on ovsdb >> processing. >> >> This improves behaviour in large heavily loaded clusters. >> While it cannot fully eliminate spurious raft elections >> under heavy load, it significantly decreases their number. >> >> Processing is (to the extent possible) restarted where it >> stopped on the previous iteration to ensure that sessions >> towards the tail of the session list are not starved. >> >> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> >> --- >> ovsdb/jsonrpc-server.c | 92 >> ++++++++++++++++++++++++++++++++++++++++-- >> ovsdb/jsonrpc-server.h | 2 +- >> ovsdb/ovsdb-server.c | 23 ++++++++++- >> ovsdb/raft.c | 6 +++ >> ovsdb/raft.h | 3 ++ >> ovsdb/storage.c | 12 ++++++ >> ovsdb/storage.h | 2 + >> 7 files changed, 134 insertions(+), 6 deletions(-) >> >> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c >> index 351c39d8a..13c437811 100644 >> --- a/ovsdb/jsonrpc-server.c >> +++ b/ovsdb/jsonrpc-server.c >> @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session >> *ovsdb_jsonrpc_session_create( >> struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool); >> static void ovsdb_jsonrpc_session_preremove_db(struct >> ovsdb_jsonrpc_remote *, >> struct ovsdb *); >> -static void ovsdb_jsonrpc_session_run_all(struct >> ovsdb_jsonrpc_remote *); >> +static void ovsdb_jsonrpc_session_run_all(struct >> ovsdb_jsonrpc_remote *, >> + uint64_t limit); >> static void ovsdb_jsonrpc_session_wait_all(struct >> ovsdb_jsonrpc_remote *); >> static void ovsdb_jsonrpc_session_get_memory_usage_all( >> const struct ovsdb_jsonrpc_remote *, struct simap *usage); >> @@ -128,6 +129,11 @@ struct ovsdb_jsonrpc_server { >> bool read_only; /* This server is does not accept any >> transactions that can modify the >> database. */ >> struct shash remotes; /* Contains "struct >> ovsdb_jsonrpc_remote *"s. */ >> + struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where >> processing >> + should restart after a >> time >> + constraint >> interruption. */ >> + bool must_wake_up; /* The processing loop must be re-run. It was >> + interrupted due to exceeding a time >> constraint. */ >> }; >> >> /* A configured remote. This is either a passive stream listener >> plus a list >> @@ -137,6 +143,9 @@ struct ovsdb_jsonrpc_remote { >> struct ovsdb_jsonrpc_server *server; >> struct pstream *listener; /* Listener, if passive. */ >> struct ovs_list sessions; /* List of "struct >> ovsdb_jsonrpc_session"s. */ >> + struct ovsdb_jsonrpc_session *skip_to; /* Session at which >> processing >> + should restart after >> an >> + interruption. */ >> uint8_t dscp; >> bool read_only; >> char *role; >> @@ -279,6 +288,7 @@ ovsdb_jsonrpc_server_add_remote(struct >> ovsdb_jsonrpc_server *svr, >> remote->dscp = options->dscp; >> remote->read_only = options->read_only; >> remote->role = nullable_xstrdup(options->role); >> + remote->skip_to = NULL; >> shash_add(&svr->remotes, name, remote); >> >> if (!listener) { >> @@ -293,6 +303,11 @@ ovsdb_jsonrpc_server_del_remote(struct >> shash_node *node) >> { >> struct ovsdb_jsonrpc_remote *remote = node->data; >> >> + /* safest option - rerun all remotes */ >> + if (remote->server->skip_to) { >> + remote->server->skip_to = NULL; >> + } >> + > I think Dumitru said this too, but shouldn't this just be: > > remote->server->skip_to = NULL; > > and completely skip the if()? > > Dan > >> ovsdb_jsonrpc_session_close_all(remote); >> pstream_close(remote->listener); >> shash_delete(&remote->server->remotes, node); >> @@ -378,12 +393,24 @@ ovsdb_jsonrpc_server_set_read_only(struct >> ovsdb_jsonrpc_server *svr, >> } >> >> void >> -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) >> +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t >> limit) >> { >> struct shash_node *node; >> + uint64_t elapsed = 0; >> + uint64_t start_time = time_msec(); >> + >> + svr->must_wake_up = false; >> >> SHASH_FOR_EACH (node, &svr->remotes) { >> struct ovsdb_jsonrpc_remote *remote = node->data; >> + if (svr->skip_to) { >> + if (remote != svr->skip_to) { >> + continue; >> + } else { >> + svr->skip_to = NULL; >> + svr->must_wake_up = true; >> + } >> + } >> >> if (remote->listener) { >> struct stream *stream; >> @@ -403,7 +430,17 @@ ovsdb_jsonrpc_server_run(struct >> ovsdb_jsonrpc_server *svr) >> } >> } >> >> - ovsdb_jsonrpc_session_run_all(remote); >> + /* We assume accept and session creation time to be >> + * negligible for the purposes of computing timeouts. >> + */ >> + ovsdb_jsonrpc_session_run_all(remote, limit - elapsed); >> + >> + elapsed = time_msec() - start_time; >> + if (elapsed > limit) { >> + svr->must_wake_up = true; >> + svr->skip_to = remote; >> + break; >> + } >> } >> } >> >> @@ -412,6 +449,16 @@ ovsdb_jsonrpc_server_wait(struct >> ovsdb_jsonrpc_server *svr) >> { >> struct shash_node *node; >> >> + if (svr->must_wake_up) { >> + /* We have stopped processing due to a time constraint. >> + * In this case there is no point to walk all sessions >> + * and rebuild the poll structure for the poll loop. >> + */ >> + poll_immediate_wake(); >> + svr->must_wake_up = false; >> + return; >> + } >> + >> SHASH_FOR_EACH (node, &svr->remotes) { >> struct ovsdb_jsonrpc_remote *remote = node->data; >> >> @@ -583,15 +630,52 @@ ovsdb_jsonrpc_session_set_options(struct >> ovsdb_jsonrpc_session *session, >> } >> >> static void >> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) >> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote, >> + uint64_t limit) >> { >> struct ovsdb_jsonrpc_session *s, *next; >> + uint64_t start_time = time_msec(); >> >> LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { >> + if (remote->skip_to && s != remote->skip_to) { >> + /* Processing was interrupted, we skip to the point >> + * where we had to interrupt it. >> + * We cannot use the _CONTINUE macro as it is not safe >> + * if the list has been changed in the meantime. >> + */ >> + continue; >> + } >> + >> + /* Set ->next as skip point if we need to restart >> processing. >> + * This way, even if current is removed, we always have a >> + * valid pointer to continue processing. >> + */ >> + remote->skip_to = >> + CONTAINER_OF(ovs_list_front(&s->node), >> + struct ovsdb_jsonrpc_session, node); >> + >> + if (remote->skip_to == s) { >> + /* The list is a singleton. This is a special case, >> where >> + * deleting the node will invalidate the pointer. >> + */ >> + remote->skip_to = NULL; >> + } >> + >> int error = ovsdb_jsonrpc_session_run(s); >> if (error) { >> ovsdb_jsonrpc_session_close(s); >> } >> + >> + if (time_msec() - start_time > limit) { >> + /* We bail leaving skip_to set. Next processing >> iteration >> + * will skip everything up to skip_to which was set to >> + * ->next earlier on. >> + */ >> + break; >> + } else { >> + /* We are within time constraints, clear skip_to. */ >> + remote->skip_to = NULL; >> + } >> } >> } >> >> diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h >> index e0653aa39..218152e9d 100644 >> --- a/ovsdb/jsonrpc-server.h >> +++ b/ovsdb/jsonrpc-server.h >> @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status( >> void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, >> bool force, >> char *comment); >> >> -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *); >> +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, >> uint64_t limit); >> void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *); >> >> void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server >> *, >> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c >> index 0b3d2bb71..3fcedf617 100644 >> --- a/ovsdb/ovsdb-server.c >> +++ b/ovsdb/ovsdb-server.c >> @@ -216,7 +216,28 @@ main_loop(struct server_config *config, >> reconfigure_remotes(jsonrpc, all_dbs, remotes), >> &remotes_error); >> report_error_if_changed(reconfigure_ssl(all_dbs), >> &ssl_error); >> - ovsdb_jsonrpc_server_run(jsonrpc); >> + >> + /* Figure out current processing time limit. */ >> + >> + bool first_db = true; >> + uint64_t limit = UINT64_MAX; >> + SHASH_FOR_EACH (node, all_dbs) { >> + struct db *db = node->data; >> + uint64_t db_limit; >> + >> + db_limit = ovsdb_storage_max_processing_time(db->db- >>> storage); >> + if (first_db) { >> + /* reset the limit */ >> + limit = db_limit; >> + first_db = false; >> + } >> + limit = MIN(db_limit, limit); >> + } >> + if (ovs_replay_is_active()) { >> + limit = UINT64_MAX; >> + } >> + >> + ovsdb_jsonrpc_server_run(jsonrpc, limit); >> >> if (*is_backup) { >> replication_run(); >> diff --git a/ovsdb/raft.c b/ovsdb/raft.c >> index 2fb515651..183463aba 100644 >> --- a/ovsdb/raft.c >> +++ b/ovsdb/raft.c >> @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_) >> } >> } >> >> +uint64_t >> +raft_get_election_timer(const struct raft *raft) >> +{ >> + return raft->election_timer; >> +} >> + >> static struct raft * >> raft_alloc(void) >> { >> diff --git a/ovsdb/raft.h b/ovsdb/raft.h >> index 3545c41c2..575e7f609 100644 >> --- a/ovsdb/raft.h >> +++ b/ovsdb/raft.h >> @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *); >> void raft_transfer_leadership(struct raft *, const char *reason); >> >> const struct uuid *raft_current_eid(const struct raft *); >> + >> +uint64_t raft_get_election_timer(const struct raft *); >> + >> #endif /* lib/raft.h */ >> diff --git a/ovsdb/storage.c b/ovsdb/storage.c >> index d727b1eac..58018fe6d 100644 >> --- a/ovsdb/storage.c >> +++ b/ovsdb/storage.c >> @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage >> *storage) >> } >> return raft_current_eid(storage->raft); >> } >> + >> +uint64_t >> +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage) >> +{ >> + if (!storage->raft) { >> + return UINT64_MAX; >> + } >> + if (raft_get_election_timer(storage->raft) > 2) { >> + return raft_get_election_timer(storage->raft) / 2; >> + } >> + return 1; >> +} >> diff --git a/ovsdb/storage.h b/ovsdb/storage.h >> index e120094d7..a8a02e0bd 100644 >> --- a/ovsdb/storage.h >> +++ b/ovsdb/storage.h >> @@ -97,4 +97,6 @@ struct ovsdb_schema >> *ovsdb_storage_read_schema(struct ovsdb_storage *); >> >> const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage >> *); >> >> +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *); >> + >> #endif /* ovsdb/storage.h */ > >
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 351c39d8a..13c437811 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create( struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool); static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *, struct ovsdb *); -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *); +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *, + uint64_t limit); static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *); static void ovsdb_jsonrpc_session_get_memory_usage_all( const struct ovsdb_jsonrpc_remote *, struct simap *usage); @@ -128,6 +129,11 @@ struct ovsdb_jsonrpc_server { bool read_only; /* This server is does not accept any transactions that can modify the database. */ struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */ + struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where processing + should restart after a time + constraint interruption. */ + bool must_wake_up; /* The processing loop must be re-run. It was + interrupted due to exceeding a time constraint. */ }; /* A configured remote. This is either a passive stream listener plus a list @@ -137,6 +143,9 @@ struct ovsdb_jsonrpc_remote { struct ovsdb_jsonrpc_server *server; struct pstream *listener; /* Listener, if passive. */ struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */ + struct ovsdb_jsonrpc_session *skip_to; /* Session at which processing + should restart after an + interruption. */ uint8_t dscp; bool read_only; char *role; @@ -279,6 +288,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr, remote->dscp = options->dscp; remote->read_only = options->read_only; remote->role = nullable_xstrdup(options->role); + remote->skip_to = NULL; shash_add(&svr->remotes, name, remote); if (!listener) { @@ -293,6 +303,11 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node) { struct ovsdb_jsonrpc_remote *remote = node->data; + /* safest option - rerun all remotes */ + if (remote->server->skip_to) { + remote->server->skip_to = NULL; + } + ovsdb_jsonrpc_session_close_all(remote); pstream_close(remote->listener); shash_delete(&remote->server->remotes, node); @@ -378,12 +393,24 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr, } void -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit) { struct shash_node *node; + uint64_t elapsed = 0; + uint64_t start_time = time_msec(); + + svr->must_wake_up = false; SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; + if (svr->skip_to) { + if (remote != svr->skip_to) { + continue; + } else { + svr->skip_to = NULL; + svr->must_wake_up = true; + } + } if (remote->listener) { struct stream *stream; @@ -403,7 +430,17 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) } } - ovsdb_jsonrpc_session_run_all(remote); + /* We assume accept and session creation time to be + * negligible for the purposes of computing timeouts. + */ + ovsdb_jsonrpc_session_run_all(remote, limit - elapsed); + + elapsed = time_msec() - start_time; + if (elapsed > limit) { + svr->must_wake_up = true; + svr->skip_to = remote; + break; + } } } @@ -412,6 +449,16 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) { struct shash_node *node; + if (svr->must_wake_up) { + /* We have stopped processing due to a time constraint. + * In this case there is no point to walk all sessions + * and rebuild the poll structure for the poll loop. + */ + poll_immediate_wake(); + svr->must_wake_up = false; + return; + } + SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; @@ -583,15 +630,52 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session, } static void -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote, + uint64_t limit) { struct ovsdb_jsonrpc_session *s, *next; + uint64_t start_time = time_msec(); LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { + if (remote->skip_to && s != remote->skip_to) { + /* Processing was interrupted, we skip to the point + * where we had to interrupt it. + * We cannot use the _CONTINUE macro as it is not safe + * if the list has been changed in the meantime. + */ + continue; + } + + /* Set ->next as skip point if we need to restart processing. + * This way, even if current is removed, we always have a + * valid pointer to continue processing. + */ + remote->skip_to = + CONTAINER_OF(ovs_list_front(&s->node), + struct ovsdb_jsonrpc_session, node); + + if (remote->skip_to == s) { + /* The list is a singleton. This is a special case, where + * deleting the node will invalidate the pointer. + */ + remote->skip_to = NULL; + } + int error = ovsdb_jsonrpc_session_run(s); if (error) { ovsdb_jsonrpc_session_close(s); } + + if (time_msec() - start_time > limit) { + /* We bail leaving skip_to set. Next processing iteration + * will skip everything up to skip_to which was set to + * ->next earlier on. + */ + break; + } else { + /* We are within time constraints, clear skip_to. */ + remote->skip_to = NULL; + } } } diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h index e0653aa39..218152e9d 100644 --- a/ovsdb/jsonrpc-server.h +++ b/ovsdb/jsonrpc-server.h @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status( void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool force, char *comment); -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *); +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit); void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *); void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *, diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index 0b3d2bb71..3fcedf617 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -216,7 +216,28 @@ main_loop(struct server_config *config, reconfigure_remotes(jsonrpc, all_dbs, remotes), &remotes_error); report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error); - ovsdb_jsonrpc_server_run(jsonrpc); + + /* Figure out current processing time limit. */ + + bool first_db = true; + uint64_t limit = UINT64_MAX; + SHASH_FOR_EACH (node, all_dbs) { + struct db *db = node->data; + uint64_t db_limit; + + db_limit = ovsdb_storage_max_processing_time(db->db->storage); + if (first_db) { + /* reset the limit */ + limit = db_limit; + first_db = false; + } + limit = MIN(db_limit, limit); + } + if (ovs_replay_is_active()) { + limit = UINT64_MAX; + } + + ovsdb_jsonrpc_server_run(jsonrpc, limit); if (*is_backup) { replication_run(); diff --git a/ovsdb/raft.c b/ovsdb/raft.c index 2fb515651..183463aba 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_) } } +uint64_t +raft_get_election_timer(const struct raft *raft) +{ + return raft->election_timer; +} + static struct raft * raft_alloc(void) { diff --git a/ovsdb/raft.h b/ovsdb/raft.h index 3545c41c2..575e7f609 100644 --- a/ovsdb/raft.h +++ b/ovsdb/raft.h @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *); void raft_transfer_leadership(struct raft *, const char *reason); const struct uuid *raft_current_eid(const struct raft *); + +uint64_t raft_get_election_timer(const struct raft *); + #endif /* lib/raft.h */ diff --git a/ovsdb/storage.c b/ovsdb/storage.c index d727b1eac..58018fe6d 100644 --- a/ovsdb/storage.c +++ b/ovsdb/storage.c @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage) } return raft_current_eid(storage->raft); } + +uint64_t +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage) +{ + if (!storage->raft) { + return UINT64_MAX; + } + if (raft_get_election_timer(storage->raft) > 2) { + return raft_get_election_timer(storage->raft) / 2; + } + return 1; +} diff --git a/ovsdb/storage.h b/ovsdb/storage.h index e120094d7..a8a02e0bd 100644 --- a/ovsdb/storage.h +++ b/ovsdb/storage.h @@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct ovsdb_storage *); const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *); +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *); + #endif /* ovsdb/storage.h */