Message ID | 1456992808-27582-1-git-send-email-azhou@ovn.org |
---|---|
State | Changes Requested |
Headers | show |
This series is also available at github at https://github.com/azhou-nicira/ovs-review/tree/mt On Thu, Mar 3, 2016 at 12:13 AM, Andy Zhou <azhou@ovn.org> wrote: > Currently ovsdb_jsonrpc_session are grouped together in a linked > list within 'ovsdb_jsonrpc_remote'. This makes sense since most > session operations applies to sessions within a remote. > > However, in order to scale up ovsdb-server with multi-threading, it is > more convenient to distribute a sessions to any thread available, > regardless which remote it is associated with. > > This patch introduces a set of APIs that provide operations on > a list of sessions. Instead of group sessions by remote, they > are linked together in a new ovs_list field 'all_sessions' in the > ovsdb_jsonrpc_server struct. > > With multi-threading, the design is that all sessions managed > by a thread will have them linked together on a thread private > linked list. At that time, the 'all_sessions' field in > ovsdb_jsonrpc_server struct will have all session managed > the main process. > > Signed-off-by: Andy Zhou <azhou@ovn.org> > --- > ovsdb/jsonrpc-server.c | 242 > ++++++++++++++++++++++++++++--------------------- > 1 file changed, 140 insertions(+), 102 deletions(-) > > diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c > index 15dbc4e..56dddc6 100644 > --- a/ovsdb/jsonrpc-server.c > +++ b/ovsdb/jsonrpc-server.c > @@ -1,4 +1,4 @@ > -/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc. > +/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, > Inc. > * > * Licensed under the Apache License, Version 2.0 (the "License"); > * you may not use this file except in compliance with the License. > @@ -52,23 +52,30 @@ static bool monitor2_enable__ = true; > /* Message rate-limiting. */ > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > > +/* Session set. */ > +static void ovsdb_jsonrpc_sessions_run(struct ovs_list *); > +static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *); > +static void ovsdb_jsonrpc_sessions_close(struct ovs_list *, > + const struct ovsdb_jsonrpc_remote > *remote); > +static void ovsdb_jsonrpc_sessions_reconnect( struct ovs_list *, > + const struct ovsdb_jsonrpc_remote > *remote); > +static void ovsdb_jsonrpc_sessions_set_options(struct ovs_list *, > + const struct ovsdb_jsonrpc_remote > *remote, > + const struct ovsdb_jsonrpc_options *); > +static size_t ovsdb_jsonrpc_sessions_count(const struct ovs_list *, > + const struct ovsdb_jsonrpc_remote > *remote); > + > /* Sessions. */ > static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create( > struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *); > -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *); > -static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *); > -static void ovsdb_jsonrpc_session_get_memory_usage_all( > - const struct ovsdb_jsonrpc_remote *, struct simap *usage); > -static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote > *); > -static void ovsdb_jsonrpc_session_reconnect_all(struct > ovsdb_jsonrpc_remote *); > -static void ovsdb_jsonrpc_session_set_all_options( > - struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *); > static bool ovsdb_jsonrpc_active_session_get_status( > - const struct ovsdb_jsonrpc_remote *, > + const struct ovsdb_jsonrpc_remote *remote, > struct ovsdb_jsonrpc_remote_status *); > static void ovsdb_jsonrpc_session_get_status( > const struct ovsdb_jsonrpc_session *, > struct ovsdb_jsonrpc_remote_status *); > +static void ovsdb_jsonrpc_session_get_memory_usage_all( > + const struct ovsdb_jsonrpc_server *, struct simap *usage); > static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session > *); > static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *); > static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *, > @@ -106,15 +113,22 @@ struct ovsdb_jsonrpc_server { > struct ovsdb_server up; > unsigned int n_sessions; > struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote > *"s. */ > + struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's. */ > }; > > +/* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server > pinter. > + * Caller needs to make sure this conversion is valid. */ > +static struct ovsdb_jsonrpc_server * > +ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) { > + return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up); > +} > + > /* A configured remote. This is either a passive stream listener plus a > list > * of the currently connected sessions, or a list of exactly one active > * session. */ > struct ovsdb_jsonrpc_remote { > struct ovsdb_jsonrpc_server *server; > struct pstream *listener; /* Listener, if passive. */ > - struct ovs_list sessions; /* List of "struct > ovsdb_jsonrpc_session"s. */ > uint8_t dscp; > }; > > @@ -134,6 +148,7 @@ ovsdb_jsonrpc_server_create(void) > struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server); > ovsdb_server_init(&server->up); > shash_init(&server->remotes); > + list_init(&server->all_sessions); > return server; > } > > @@ -232,7 +247,8 @@ ovsdb_jsonrpc_server_set_remotes(struct > ovsdb_jsonrpc_server *svr, > } > } > > - ovsdb_jsonrpc_session_set_all_options(remote, options); > + ovsdb_jsonrpc_sessions_set_options(&svr->all_sessions, remote, > + options); > } > } > > @@ -254,7 +270,6 @@ ovsdb_jsonrpc_server_add_remote(struct > ovsdb_jsonrpc_server *svr, > remote = xmalloc(sizeof *remote); > remote->server = svr; > remote->listener = listener; > - list_init(&remote->sessions); > remote->dscp = options->dscp; > shash_add(&svr->remotes, name, remote); > > @@ -268,10 +283,11 @@ static void > ovsdb_jsonrpc_server_del_remote(struct shash_node *node) > { > struct ovsdb_jsonrpc_remote *remote = node->data; > + struct ovsdb_jsonrpc_server *server = remote->server; > > - ovsdb_jsonrpc_session_close_all(remote); > + ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote); > pstream_close(remote->listener); > - shash_delete(&remote->server->remotes, node); > + shash_delete(&server->remotes, node); > free(remote); > } > > @@ -297,9 +313,12 @@ ovsdb_jsonrpc_server_get_remote_status( > } > > if (remote->listener) { > + int n_connections = > ovsdb_jsonrpc_sessions_count(&svr->all_sessions, > + remote); > + > status->bound_port = pstream_get_bound_port(remote->listener); > - status->is_connected = !list_is_empty(&remote->sessions); > - status->n_connections = list_size(&remote->sessions); > + status->n_connections = n_connections; > + status->is_connected = (n_connections != 0); > return true; > } > > @@ -325,7 +344,7 @@ ovsdb_jsonrpc_server_reconnect(struct > ovsdb_jsonrpc_server *svr) > SHASH_FOR_EACH (node, &svr->remotes) { > struct ovsdb_jsonrpc_remote *remote = node->data; > > - ovsdb_jsonrpc_session_reconnect_all(remote); > + ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote); > } > } > > @@ -354,7 +373,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server > *svr) > } > } > > - ovsdb_jsonrpc_session_run_all(remote); > + ovsdb_jsonrpc_sessions_run(&svr->all_sessions); > } > } > > @@ -370,7 +389,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server > *svr) > pstream_wait(remote->listener); > } > > - ovsdb_jsonrpc_session_wait_all(remote); > + ovsdb_jsonrpc_sessions_wait(&svr->all_sessions); > } > } > > @@ -380,14 +399,8 @@ void > ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server > *svr, > struct simap *usage) > { > - struct shash_node *node; > - > simap_increase(usage, "sessions", svr->n_sessions); > - SHASH_FOR_EACH (node, &svr->remotes) { > - struct ovsdb_jsonrpc_remote *remote = node->data; > - > - ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage); > - } > + ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage); > } > > /* JSON-RPC database server session. */ > @@ -423,17 +436,18 @@ ovsdb_jsonrpc_session_create(struct > ovsdb_jsonrpc_remote *remote, > struct jsonrpc_session *js) > { > struct ovsdb_jsonrpc_session *s; > + struct ovsdb_jsonrpc_server *server = remote->server; > > s = xzalloc(sizeof *s); > - ovsdb_session_init(&s->up, &remote->server->up); > + ovsdb_session_init(&s->up, &server->up); > s->remote = remote; > - list_push_back(&remote->sessions, &s->node); > + list_push_back(&server->all_sessions, &s->node); > hmap_init(&s->triggers); > hmap_init(&s->monitors); > s->js = js; > s->js_seqno = jsonrpc_session_get_seqno(js); > > - remote->server->n_sessions++; > + server->n_sessions++; > > return s; > } > @@ -441,6 +455,8 @@ ovsdb_jsonrpc_session_create(struct > ovsdb_jsonrpc_remote *remote, > static void > ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) > { > + struct ovsdb_jsonrpc_server *server; > + > ovsdb_jsonrpc_monitor_remove_all(s); > ovsdb_jsonrpc_session_unlock_all(s); > ovsdb_jsonrpc_trigger_complete_all(s); > @@ -450,7 +466,8 @@ ovsdb_jsonrpc_session_close(struct > ovsdb_jsonrpc_session *s) > > jsonrpc_session_close(s->js); > list_remove(&s->node); > - s->remote->server->n_sessions--; > + server = ovsdb_jsonrpc_server_cast(s->up.server); > + server->n_sessions--; > ovsdb_session_destroy(&s->up); > free(s); > } > @@ -501,19 +518,6 @@ ovsdb_jsonrpc_session_set_options(struct > ovsdb_jsonrpc_session *session, > } > > static void > -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) > -{ > - struct ovsdb_jsonrpc_session *s, *next; > - > - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { > - int error = ovsdb_jsonrpc_session_run(s); > - if (error) { > - ovsdb_jsonrpc_session_close(s); > - } > - } > -} > - > -static void > ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) > { > jsonrpc_session_wait(s->js); > @@ -527,16 +531,6 @@ ovsdb_jsonrpc_session_wait(struct > ovsdb_jsonrpc_session *s) > } > > static void > -ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote) > -{ > - struct ovsdb_jsonrpc_session *s; > - > - LIST_FOR_EACH (s, node, &remote->sessions) { > - ovsdb_jsonrpc_session_wait(s); > - } > -} > - > -static void > ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session > *s, > struct simap *usage) > { > @@ -546,65 +540,22 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct > ovsdb_jsonrpc_session *s, > > static void > ovsdb_jsonrpc_session_get_memory_usage_all( > - const struct ovsdb_jsonrpc_remote *remote, > - struct simap *usage) > + const struct ovsdb_jsonrpc_server *svr, struct simap *usage) > { > struct ovsdb_jsonrpc_session *s; > > - LIST_FOR_EACH (s, node, &remote->sessions) { > + LIST_FOR_EACH (s, node, &svr->all_sessions) { > ovsdb_jsonrpc_session_get_memory_usage(s, usage); > } > } > > -static void > -ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote) > -{ > - struct ovsdb_jsonrpc_session *s, *next; > - > - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { > - ovsdb_jsonrpc_session_close(s); > - } > -} > - > -/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect > and > - * reconnect. */ > -static void > -ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote) > -{ > - struct ovsdb_jsonrpc_session *s, *next; > - > - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { > - jsonrpc_session_force_reconnect(s->js); > - if (!jsonrpc_session_is_alive(s->js)) { > - ovsdb_jsonrpc_session_close(s); > - } > - } > -} > - > -/* Sets the options for all of the JSON-RPC sessions managed by 'remote' > to > - * 'options'. > - * > - * (The dscp value can't be changed directly; the caller must instead > close and > - * re-open the session.) */ > -static void > -ovsdb_jsonrpc_session_set_all_options( > - struct ovsdb_jsonrpc_remote *remote, > - const struct ovsdb_jsonrpc_options *options) > -{ > - struct ovsdb_jsonrpc_session *s; > - > - LIST_FOR_EACH (s, node, &remote->sessions) { > - ovsdb_jsonrpc_session_set_options(s, options); > - } > -} > - > /* Sets the 'status' of for the 'remote' with an outgoing connection. */ > static bool > ovsdb_jsonrpc_active_session_get_status( > const struct ovsdb_jsonrpc_remote *remote, > struct ovsdb_jsonrpc_remote_status *status) > { > - const struct ovs_list *sessions = &remote->sessions; > + const struct ovs_list *sessions = &remote->server->all_sessions; > const struct ovsdb_jsonrpc_session *s; > > if (list_is_empty(sessions)) { > @@ -764,8 +715,7 @@ ovsdb_jsonrpc_session_lock(struct > ovsdb_jsonrpc_session *s, > } > > /* Get the lock, add us as a waiter. */ > - waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, > mode, > - &victim); > + waiter = ovsdb_server_lock(s->up.server, &s->up, lock_name, mode, > &victim); > if (victim) { > ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen"); > } > @@ -1399,3 +1349,91 @@ ovsdb_jsonrpc_disable_monitor2(void) > /* Once disabled, it is not possible to re-enable it. */ > monitor2_enable__ = false; > } > + > +static void > +ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions) > +{ > + struct ovsdb_jsonrpc_session *s, *next; > + > + LIST_FOR_EACH_SAFE (s, next, node, sessions) { > + int error = ovsdb_jsonrpc_session_run(s); > + if (error) { > + ovsdb_jsonrpc_session_close(s); > + } > + } > +} > + > +static void > +ovsdb_jsonrpc_sessions_wait(struct ovs_list *sessions) > +{ > + struct ovsdb_jsonrpc_session *s; > + > + LIST_FOR_EACH (s, node, sessions) { > + ovsdb_jsonrpc_session_wait(s); > + } > +} > + > +static void > +ovsdb_jsonrpc_sessions_close(struct ovs_list *sessions, > + const struct ovsdb_jsonrpc_remote *remote) > +{ > + struct ovsdb_jsonrpc_session *s, *next; > + > + LIST_FOR_EACH_SAFE (s, next, node, sessions) { > + if (s->remote == remote) { > + ovsdb_jsonrpc_session_close(s); > + } > + } > +} > + > +/* Forces all of the JSON-RPC sessions to disconnect and > + * reconnect. */ > +static void > +ovsdb_jsonrpc_sessions_reconnect(struct ovs_list *sessions, > + const struct ovsdb_jsonrpc_remote > *remote) > +{ > + struct ovsdb_jsonrpc_session *s, *next; > + > + LIST_FOR_EACH_SAFE (s, next, node, sessions) { > + if (s->remote == remote) { > + jsonrpc_session_force_reconnect(s->js); > + if (!jsonrpc_session_is_alive(s->js)) { > + ovsdb_jsonrpc_session_close(s); > + } > + } > + } > +} > + > +static size_t > +ovsdb_jsonrpc_sessions_count(const struct ovs_list *sessions, > + const struct ovsdb_jsonrpc_remote *remote) > +{ > + struct ovsdb_jsonrpc_session *s = NULL; > + size_t count = 0; > + > + LIST_FOR_EACH (s, node, sessions) { > + if (s->remote == remote) { > + count++; > + } > + } > + return count; > +} > + > +/* Sets the options for all of the JSON-RPC sessions managed by 'remote' > to > + * 'options'. > + * > + * (The dscp value can't be changed directly; the caller must instead > close and > + * re-open the session.) */ > +static void > +ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions, > + const struct ovsdb_jsonrpc_remote > *remote, > + const struct ovsdb_jsonrpc_options > *options) > +{ > + struct ovsdb_jsonrpc_session *s; > + > + LIST_FOR_EACH (s, node, sessions) { > + if (s->remote == remote) { > + ovsdb_jsonrpc_session_set_options(s, options); > + } > + } > +} > -- > 1.9.1 > >
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 15dbc4e..56dddc6 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc. +/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,23 +52,30 @@ static bool monitor2_enable__ = true; /* Message rate-limiting. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); +/* Session set. */ +static void ovsdb_jsonrpc_sessions_run(struct ovs_list *); +static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *); +static void ovsdb_jsonrpc_sessions_close(struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); +static void ovsdb_jsonrpc_sessions_reconnect( struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); +static void ovsdb_jsonrpc_sessions_set_options(struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote, + const struct ovsdb_jsonrpc_options *); +static size_t ovsdb_jsonrpc_sessions_count(const struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); + /* Sessions. */ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create( struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *); -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_get_memory_usage_all( - const struct ovsdb_jsonrpc_remote *, struct simap *usage); -static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_set_all_options( - struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *); static bool ovsdb_jsonrpc_active_session_get_status( - const struct ovsdb_jsonrpc_remote *, + const struct ovsdb_jsonrpc_remote *remote, struct ovsdb_jsonrpc_remote_status *); static void ovsdb_jsonrpc_session_get_status( const struct ovsdb_jsonrpc_session *, struct ovsdb_jsonrpc_remote_status *); +static void ovsdb_jsonrpc_session_get_memory_usage_all( + const struct ovsdb_jsonrpc_server *, struct simap *usage); static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *); static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *, @@ -106,15 +113,22 @@ struct ovsdb_jsonrpc_server { struct ovsdb_server up; unsigned int n_sessions; struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */ + struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's. */ }; +/* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server pinter. + * Caller needs to make sure this conversion is valid. */ +static struct ovsdb_jsonrpc_server * +ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) { + return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up); +} + /* A configured remote. This is either a passive stream listener plus a list * of the currently connected sessions, or a list of exactly one active * session. */ struct ovsdb_jsonrpc_remote { struct ovsdb_jsonrpc_server *server; struct pstream *listener; /* Listener, if passive. */ - struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */ uint8_t dscp; }; @@ -134,6 +148,7 @@ ovsdb_jsonrpc_server_create(void) struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server); ovsdb_server_init(&server->up); shash_init(&server->remotes); + list_init(&server->all_sessions); return server; } @@ -232,7 +247,8 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr, } } - ovsdb_jsonrpc_session_set_all_options(remote, options); + ovsdb_jsonrpc_sessions_set_options(&svr->all_sessions, remote, + options); } } @@ -254,7 +270,6 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr, remote = xmalloc(sizeof *remote); remote->server = svr; remote->listener = listener; - list_init(&remote->sessions); remote->dscp = options->dscp; shash_add(&svr->remotes, name, remote); @@ -268,10 +283,11 @@ static void ovsdb_jsonrpc_server_del_remote(struct shash_node *node) { struct ovsdb_jsonrpc_remote *remote = node->data; + struct ovsdb_jsonrpc_server *server = remote->server; - ovsdb_jsonrpc_session_close_all(remote); + ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote); pstream_close(remote->listener); - shash_delete(&remote->server->remotes, node); + shash_delete(&server->remotes, node); free(remote); } @@ -297,9 +313,12 @@ ovsdb_jsonrpc_server_get_remote_status( } if (remote->listener) { + int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions, + remote); + status->bound_port = pstream_get_bound_port(remote->listener); - status->is_connected = !list_is_empty(&remote->sessions); - status->n_connections = list_size(&remote->sessions); + status->n_connections = n_connections; + status->is_connected = (n_connections != 0); return true; } @@ -325,7 +344,7 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr) SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; - ovsdb_jsonrpc_session_reconnect_all(remote); + ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote); } } @@ -354,7 +373,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) } } - ovsdb_jsonrpc_session_run_all(remote); + ovsdb_jsonrpc_sessions_run(&svr->all_sessions); } } @@ -370,7 +389,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) pstream_wait(remote->listener); } - ovsdb_jsonrpc_session_wait_all(remote); + ovsdb_jsonrpc_sessions_wait(&svr->all_sessions); } } @@ -380,14 +399,8 @@ void ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr, struct simap *usage) { - struct shash_node *node; - simap_increase(usage, "sessions", svr->n_sessions); - SHASH_FOR_EACH (node, &svr->remotes) { - struct ovsdb_jsonrpc_remote *remote = node->data; - - ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage); - } + ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage); } /* JSON-RPC database server session. */ @@ -423,17 +436,18 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, struct jsonrpc_session *js) { struct ovsdb_jsonrpc_session *s; + struct ovsdb_jsonrpc_server *server = remote->server; s = xzalloc(sizeof *s); - ovsdb_session_init(&s->up, &remote->server->up); + ovsdb_session_init(&s->up, &server->up); s->remote = remote; - list_push_back(&remote->sessions, &s->node); + list_push_back(&server->all_sessions, &s->node); hmap_init(&s->triggers); hmap_init(&s->monitors); s->js = js; s->js_seqno = jsonrpc_session_get_seqno(js); - remote->server->n_sessions++; + server->n_sessions++; return s; } @@ -441,6 +455,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) { + struct ovsdb_jsonrpc_server *server; + ovsdb_jsonrpc_monitor_remove_all(s); ovsdb_jsonrpc_session_unlock_all(s); ovsdb_jsonrpc_trigger_complete_all(s); @@ -450,7 +466,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) jsonrpc_session_close(s->js); list_remove(&s->node); - s->remote->server->n_sessions--; + server = ovsdb_jsonrpc_server_cast(s->up.server); + server->n_sessions--; ovsdb_session_destroy(&s->up); free(s); } @@ -501,19 +518,6 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session, } static void -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - int error = ovsdb_jsonrpc_session_run(s); - if (error) { - ovsdb_jsonrpc_session_close(s); - } - } -} - -static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) { jsonrpc_session_wait(s->js); @@ -527,16 +531,6 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) } static void -ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s; - - LIST_FOR_EACH (s, node, &remote->sessions) { - ovsdb_jsonrpc_session_wait(s); - } -} - -static void ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s, struct simap *usage) { @@ -546,65 +540,22 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s, static void ovsdb_jsonrpc_session_get_memory_usage_all( - const struct ovsdb_jsonrpc_remote *remote, - struct simap *usage) + const struct ovsdb_jsonrpc_server *svr, struct simap *usage) { struct ovsdb_jsonrpc_session *s; - LIST_FOR_EACH (s, node, &remote->sessions) { + LIST_FOR_EACH (s, node, &svr->all_sessions) { ovsdb_jsonrpc_session_get_memory_usage(s, usage); } } -static void -ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - ovsdb_jsonrpc_session_close(s); - } -} - -/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and - * reconnect. */ -static void -ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - jsonrpc_session_force_reconnect(s->js); - if (!jsonrpc_session_is_alive(s->js)) { - ovsdb_jsonrpc_session_close(s); - } - } -} - -/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to - * 'options'. - * - * (The dscp value can't be changed directly; the caller must instead close and - * re-open the session.) */ -static void -ovsdb_jsonrpc_session_set_all_options( - struct ovsdb_jsonrpc_remote *remote, - const struct ovsdb_jsonrpc_options *options) -{ - struct ovsdb_jsonrpc_session *s; - - LIST_FOR_EACH (s, node, &remote->sessions) { - ovsdb_jsonrpc_session_set_options(s, options); - } -} - /* Sets the 'status' of for the 'remote' with an outgoing connection. */ static bool ovsdb_jsonrpc_active_session_get_status( const struct ovsdb_jsonrpc_remote *remote, struct ovsdb_jsonrpc_remote_status *status) { - const struct ovs_list *sessions = &remote->sessions; + const struct ovs_list *sessions = &remote->server->all_sessions; const struct ovsdb_jsonrpc_session *s; if (list_is_empty(sessions)) { @@ -764,8 +715,7 @@ ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s, } /* Get the lock, add us as a waiter. */ - waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode, - &victim); + waiter = ovsdb_server_lock(s->up.server, &s->up, lock_name, mode, &victim); if (victim) { ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen"); } @@ -1399,3 +1349,91 @@ ovsdb_jsonrpc_disable_monitor2(void) /* Once disabled, it is not possible to re-enable it. */ monitor2_enable__ = false; } + +static void +ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + int error = ovsdb_jsonrpc_session_run(s); + if (error) { + ovsdb_jsonrpc_session_close(s); + } + } +} + +static void +ovsdb_jsonrpc_sessions_wait(struct ovs_list *sessions) +{ + struct ovsdb_jsonrpc_session *s; + + LIST_FOR_EACH (s, node, sessions) { + ovsdb_jsonrpc_session_wait(s); + } +} + +static void +ovsdb_jsonrpc_sessions_close(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + if (s->remote == remote) { + ovsdb_jsonrpc_session_close(s); + } + } +} + +/* Forces all of the JSON-RPC sessions to disconnect and + * reconnect. */ +static void +ovsdb_jsonrpc_sessions_reconnect(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + if (s->remote == remote) { + jsonrpc_session_force_reconnect(s->js); + if (!jsonrpc_session_is_alive(s->js)) { + ovsdb_jsonrpc_session_close(s); + } + } + } +} + +static size_t +ovsdb_jsonrpc_sessions_count(const struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s = NULL; + size_t count = 0; + + LIST_FOR_EACH (s, node, sessions) { + if (s->remote == remote) { + count++; + } + } + return count; +} + +/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to + * 'options'. + * + * (The dscp value can't be changed directly; the caller must instead close and + * re-open the session.) */ +static void +ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote, + const struct ovsdb_jsonrpc_options *options) +{ + struct ovsdb_jsonrpc_session *s; + + LIST_FOR_EACH (s, node, sessions) { + if (s->remote == remote) { + ovsdb_jsonrpc_session_set_options(s, options); + } + } +}
Currently ovsdb_jsonrpc_session are grouped together in a linked list within 'ovsdb_jsonrpc_remote'. This makes sense since most session operations applies to sessions within a remote. However, in order to scale up ovsdb-server with multi-threading, it is more convenient to distribute a sessions to any thread available, regardless which remote it is associated with. This patch introduces a set of APIs that provide operations on a list of sessions. Instead of group sessions by remote, they are linked together in a new ovs_list field 'all_sessions' in the ovsdb_jsonrpc_server struct. With multi-threading, the design is that all sessions managed by a thread will have them linked together on a thread private linked list. At that time, the 'all_sessions' field in ovsdb_jsonrpc_server struct will have all session managed the main process. Signed-off-by: Andy Zhou <azhou@ovn.org> --- ovsdb/jsonrpc-server.c | 242 ++++++++++++++++++++++++++++--------------------- 1 file changed, 140 insertions(+), 102 deletions(-)