diff mbox

[ovs-dev,ovsdb-server,multithreading,RFC,3/9] ovsdb: Spawn threads to handle jsonrpc sessions

Message ID 1456992808-27582-3-git-send-email-azhou@ovn.org
State Changes Requested
Headers show

Commit Message

Andy Zhou March 3, 2016, 8:13 a.m. UTC
Add the basic infrastructure to create and destroy threads that
will be used to handle jsonrpc sessions.

This patch implements a simple logic to create thread on demand.
Using number of sessions a server services as an rough measure of
system load, A jsonrpc server creates more threads as the number of
sessions exceeds a pre defined number of session per thread, Until
a user specified maximum is reached.

The threading model is simple. OVSDB server starts up with no threads
(This is required to handle the --run option).  Once created,
a thread is never destroyed until the OVSDB server is shut down.

Session thread does not do anything useful. In fact, the threads
won't even be created because the max number threads is hard coded
to zero. Later patch will create and make use of those threads.

Signed-off-by: Andy Zhou <azhou@ovn.org>
---
 ovsdb/jsonrpc-server.c | 100 ++++++++++++++++++++++++++++++++++++++++++++++---
 ovsdb/jsonrpc-server.h |   4 +-
 ovsdb/ovsdb-server.c   |   4 +-
 3 files changed, 98 insertions(+), 10 deletions(-)
diff mbox

Patch

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 660250d..63788ec 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -25,6 +25,8 @@ 
 #include "monitor.h"
 #include "json.h"
 #include "jsonrpc.h"
+#include "latch.h"
+#include "ovs-thread.h"
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
@@ -107,13 +109,32 @@  static struct json *ovsdb_jsonrpc_monitor_compose_update(
     struct ovsdb_jsonrpc_monitor *monitor, bool initial);
 
 
+struct sessions_thread {
+    pthread_t thread;
+
+    /* Controls thread exit. */
+    struct latch exit_latch;
+};
+
+static void sessions_thread_init(struct sessions_thread *);
+static void sessions_thread_exit(struct sessions_thread *);
+
+
 /* JSON-RPC database server. */
 
 struct ovsdb_jsonrpc_server {
     struct ovsdb_server up;
     unsigned int n_sessions;
     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
-    struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's.   */
+    struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's managed
+                                     by the main process. Those are sessions
+                                     created before multithreading, or
+                                     connections of active remotes.. */
+
+    /* Threads. */
+    size_t n_max_threads;
+    size_t n_active_threads;
+    struct sessions_thread *threads;
 };
 
 /* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server pinter.
@@ -123,6 +144,12 @@  ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) {
     return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up);
 }
 
+/* The minimal number of sessions per thread, before a new thread will be
+ * added, until 'n_max_threads' is reached. Once created, the thread
+ * will not destroyed. */
+#define N_SESSIONS_THREASHHOLD  (2)
+static bool ovsdb_jsonrpc_server_use_threads(struct ovsdb_jsonrpc_server *);
+
 /* A configured remote.  This is either a passive stream listener plus a list
  * of the currently connected sessions, or a list of exactly one active
  * session. */
@@ -140,15 +167,20 @@  static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
 
 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
  *
+ * 'max_threads' limits * the number of threads it can create.
+ *
  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
- * which 'server' should provide access. */
+ * which 'server' should provide access.  */
 struct ovsdb_jsonrpc_server *
-ovsdb_jsonrpc_server_create(void)
+ovsdb_jsonrpc_server_create(size_t n_max_threads)
 {
     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     ovsdb_server_init(&server->up);
     shash_init(&server->remotes);
     list_init(&server->all_sessions);
+    server->n_max_threads = n_max_threads;
+    server->n_active_threads = 0;
+    server->threads = xmalloc(sizeof *server->threads * server->n_max_threads);
     return server;
 }
 
@@ -192,10 +224,18 @@  void
 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
 {
     struct shash_node *node, *next;
+    size_t i;
 
     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
         ovsdb_jsonrpc_server_del_remote(node);
     }
+
+    for (i = 0; i < svr->n_active_threads; i++) {
+        struct sessions_thread *thread = &svr->threads[i];
+        sessions_thread_exit(thread);
+    }
+    free(svr->threads);
+
     shash_destroy(&svr->remotes);
     ovsdb_server_destroy(&svr->up);
     free(svr);
@@ -362,10 +402,12 @@  ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 
             error = pstream_accept(remote->listener, &stream);
             if (!error) {
-                struct jsonrpc_session *js;
-                js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
+                if (!ovsdb_jsonrpc_server_use_threads(svr)) {
+                    struct jsonrpc_session *js;
+                    js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
                                                      remote->dscp);
-                ovsdb_jsonrpc_session_create(remote, js);
+                    ovsdb_jsonrpc_session_create(remote, js);
+                }
             } else if (error != EAGAIN) {
                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
                              pstream_get_name(remote->listener),
@@ -402,6 +444,21 @@  ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
     simap_increase(usage, "sessions", svr->n_sessions);
     ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage);
 }
+
+static bool
+ovsdb_jsonrpc_server_use_threads(struct ovsdb_jsonrpc_server *svr) {
+    if (svr->n_active_threads != svr->n_max_threads) {
+        /* Look up the number of sessions to decide if there is a need
+         * for a new thread.  */
+        size_t n_desired_threads = svr->n_sessions /  N_SESSIONS_THREASHHOLD;
+
+        if (n_desired_threads > svr->n_active_threads) {
+            sessions_thread_init(&svr->threads[svr->n_active_threads++]);
+        }
+    }
+
+    return svr->n_active_threads;
+}
 
 /* JSON-RPC database server session. */
 
@@ -1437,3 +1494,34 @@  ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions,
         }
     }
 }
+
+static void *
+sessions_thread_main(void * f_)
+{
+    struct sessions_thread *sessions_thread = f_;
+
+    VLOG_DBG("Created");
+    while (!latch_is_set(&sessions_thread->exit_latch)) {
+        latch_wait(&sessions_thread->exit_latch);
+        poll_block();
+    }
+    VLOG_DBG("Finished");
+    return NULL;
+}
+
+static void
+sessions_thread_init(struct sessions_thread *thread)
+{
+    thread->thread = ovs_thread_create("sessions_thread",
+                                       sessions_thread_main, thread);
+
+    latch_init(&thread->exit_latch);
+}
+
+static void
+sessions_thread_exit(struct sessions_thread *thread)
+{
+    latch_set(&thread->exit_latch);
+    xpthread_join(thread->thread, NULL);
+    latch_destroy(&thread->exit_latch);
+}
diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
index 7a0bd31..3be34e7 100644
--- a/ovsdb/jsonrpc-server.h
+++ b/ovsdb/jsonrpc-server.h
@@ -1,4 +1,4 @@ 
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@  struct ovsdb;
 struct shash;
 struct simap;
 
-struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(void);
+struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(size_t n_max_threads);
 bool ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *,
                                  struct ovsdb *);
 bool ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *,
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index fa662b1..8dd79c4 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -1,4 +1,4 @@ 
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -256,7 +256,7 @@  main(int argc, char *argv[])
 
     /* Load the saved config. */
     load_config(config_tmpfile, &remotes, &db_filenames);
-    jsonrpc = ovsdb_jsonrpc_server_create();
+    jsonrpc = ovsdb_jsonrpc_server_create(0);
 
     shash_init(&all_dbs);
     server_config.all_dbs = &all_dbs;