diff mbox series

[ovs-dev,RFC,06/52] jsonrpc: Allow jsonrpc_session to have more than one remote.

Message ID 20170919220125.32535-7-blp@ovn.org
State RFC
Headers show
Series clustering implementation | expand

Commit Message

Ben Pfaff Sept. 19, 2017, 10 p.m. UTC
The implementation cycles through the remotes in random order.  This allows
clients to perform some load balancing across alternative implementations
of a service.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 lib/jsonrpc.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 lib/jsonrpc.h |  5 ++++-
 2 files changed, 65 insertions(+), 6 deletions(-)
diff mbox series

Patch

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 2fae057de181..838334daa9a8 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ 
 #include "openvswitch/ofpbuf.h"
 #include "ovs-thread.h"
 #include "poll-loop.h"
+#include "random.h"
 #include "reconnect.h"
 #include "stream.h"
 #include "timeval.h"
@@ -753,6 +754,10 @@  jsonrpc_msg_to_json(struct jsonrpc_msg *m)
 /* A JSON-RPC session with reconnection. */
 
 struct jsonrpc_session {
+    char **remotes;
+    size_t n_remotes;
+    size_t next_remote;
+
     struct reconnect *reconnect;
     struct jsonrpc *rpc;
     struct stream *stream;
@@ -762,6 +767,13 @@  struct jsonrpc_session {
     uint8_t dscp;
 };
 
+static void
+jsonrpc_session_pick_remote(struct jsonrpc_session *s)
+{
+    reconnect_set_name(s->reconnect,
+                       s->remotes[s->next_remote++ % s->n_remotes]);
+}
+
 /* Creates and returns a jsonrpc_session to 'name', which should be a string
  * acceptable to stream_open() or pstream_open().
  *
@@ -779,12 +791,36 @@  struct jsonrpc_session {
 struct jsonrpc_session *
 jsonrpc_session_open(const char *name, bool retry)
 {
+    return jsonrpc_session_open_multiple(&name, 1, retry);
+}
+
+struct jsonrpc_session *
+jsonrpc_session_open_multiple(const char **names, size_t n, bool retry)
+{
     struct jsonrpc_session *s;
 
     s = xmalloc(sizeof *s);
+
+    /* Set 'n' remotes from 'names', shuffling them into random order. */
+    ovs_assert(n > 0);
+    s->remotes = xmalloc(n * sizeof *s->remotes);
+    for (size_t i = 0; i < n; i++) {
+        s->remotes[i] = xstrdup(names[i]);
+    }
+    s->n_remotes = n;
+    for (size_t i = 0; i < n; i++) {
+        size_t j = i + random_range(n - i);
+        char **r = s->remotes;
+        char *tmp = r[i];
+        r[i] = r[j];
+        r[j] = tmp;
+    }
+    s->next_remote = 0;
+
     s->reconnect = reconnect_create(time_msec());
-    reconnect_set_name(s->reconnect, name);
+    jsonrpc_session_pick_remote(s);
     reconnect_enable(s->reconnect, time_msec());
+    reconnect_set_backoff_free_tries(s->reconnect, n);
     s->rpc = NULL;
     s->stream = NULL;
     s->pstream = NULL;
@@ -792,10 +828,11 @@  jsonrpc_session_open(const char *name, bool retry)
     s->dscp = 0;
     s->last_error = 0;
 
+    const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
         reconnect_set_passive(s->reconnect, true, time_msec());
     } else if (!retry) {
-        reconnect_set_max_tries(s->reconnect, 1);
+        reconnect_set_max_tries(s->reconnect, n);
         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
     }
 
@@ -817,6 +854,10 @@  jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     struct jsonrpc_session *s;
 
     s = xmalloc(sizeof *s);
+    s->remotes = xmalloc(sizeof *s->remotes);
+    s->remotes[0] = xstrdup(jsonrpc_get_name(jsonrpc));
+    s->n_remotes = 1;
+    s->next_remote = 0;
     s->reconnect = reconnect_create(time_msec());
     reconnect_set_quiet(s->reconnect, true);
     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
@@ -839,6 +880,10 @@  jsonrpc_session_close(struct jsonrpc_session *s)
         reconnect_destroy(s->reconnect);
         stream_close(s->stream);
         pstream_close(s->pstream);
+        for (size_t i = 0; i < s->n_remotes; i++) {
+            free(s->remotes[i]);
+        }
+        free(s->remotes);
         free(s);
     }
 }
@@ -850,12 +895,15 @@  jsonrpc_session_disconnect(struct jsonrpc_session *s)
         jsonrpc_error(s->rpc, EOF);
         jsonrpc_close(s->rpc);
         s->rpc = NULL;
-        s->seqno++;
     } else if (s->stream) {
         stream_close(s->stream);
         s->stream = NULL;
-        s->seqno++;
+    } else {
+        return;
     }
+
+    s->seqno++;
+    jsonrpc_session_pick_remote(s);
 }
 
 static void
@@ -882,6 +930,7 @@  jsonrpc_session_connect(struct jsonrpc_session *s)
 
     if (error) {
         reconnect_connect_failed(s->reconnect, time_msec(), error);
+        jsonrpc_session_pick_remote(s);
     }
     s->seqno++;
 }
@@ -945,6 +994,7 @@  jsonrpc_session_run(struct jsonrpc_session *s)
             s->stream = NULL;
         } else if (error != EAGAIN) {
             reconnect_connect_failed(s->reconnect, time_msec(), error);
+            jsonrpc_session_pick_remote(s);
             stream_close(s->stream);
             s->stream = NULL;
             s->last_error = error;
@@ -1015,6 +1065,12 @@  jsonrpc_session_get_id(const struct jsonrpc_session *s)
     }
 }
 
+size_t
+jsonrpc_session_get_n_remotes(const struct jsonrpc_session *s)
+{
+    return s->n_remotes;
+}
+
 /* Always takes ownership of 'msg', regardless of success. */
 int
 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 9b4fb0e51374..97110c00ff05 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2009, 2010, 2012, 2013 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2012, 2013, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -99,6 +99,8 @@  struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
 /* A JSON-RPC session with reconnection. */
 
 struct jsonrpc_session *jsonrpc_session_open(const char *name, bool retry);
+struct jsonrpc_session *jsonrpc_session_open_multiple(const char **remotes,
+                                                      size_t n, bool retry);
 struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *,
                                                         uint8_t);
 void jsonrpc_session_close(struct jsonrpc_session *);
@@ -108,6 +110,7 @@  void jsonrpc_session_wait(struct jsonrpc_session *);
 
 size_t jsonrpc_session_get_backlog(const struct jsonrpc_session *);
 const char *jsonrpc_session_get_name(const struct jsonrpc_session *);
+size_t jsonrpc_session_get_n_remotes(const struct jsonrpc_session *);
 
 int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
 struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *);