[ovs-dev,03/15] jsonrpc: Allow jsonrpc_session to have more than one remote.

Message ID 20180101051640.13043-3-blp@ovn.org
State New
Headers show
Series
  • [ovs-dev,01/15] log: Add async commit support.
Related show

Commit Message

Ben Pfaff Jan. 1, 2018, 5:16 a.m.
The implementation cycles through the remotes in random order.  This allows
clients to perform some load balancing across alternative implementations
of a service.

Signed-off-by: Ben Pfaff <blp@ovn.org>
Acked-by: Russell Bryant <russell@ovn.org>
---
 lib/jsonrpc.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++-----
 lib/jsonrpc.h |  6 +++++-
 lib/svec.c    | 18 ++++++++++++++++++
 lib/svec.h    |  1 +
 4 files changed, 72 insertions(+), 6 deletions(-)

Comments

Justin Pettit Jan. 13, 2018, 12:34 a.m. | #1
> On Dec 31, 2017, at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote:
> 
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index 87ca1aa8690c..a8e5bc8434ad 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -1,5 +1,5 @@
> /*
> - * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
> + * Copyright (c) 2009-2017 Nicira, Inc.
>  *
>  * Licensed under the Apache License, Version 2.0 (the "License");
>  * you may not use this file except in compliance with the License.
> @@ -28,8 +28,10 @@
> #include "openvswitch/ofpbuf.h"
> #include "ovs-thread.h"
> #include "openvswitch/poll-loop.h"
> +#include "random.h"

Do you need this, since I think "random.h" information is only used in "svec.c".

Acked-by: Justin Pettit <jpettit@ovn.org>

--Justin

Patch

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 87ca1aa8690c..a8e5bc8434ad 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
+ * Copyright (c) 2009-2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,8 +28,10 @@ 
 #include "openvswitch/ofpbuf.h"
 #include "ovs-thread.h"
 #include "openvswitch/poll-loop.h"
+#include "random.h"
 #include "reconnect.h"
 #include "stream.h"
+#include "svec.h"
 #include "timeval.h"
 #include "openvswitch/vlog.h"
 
@@ -753,6 +755,9 @@  jsonrpc_msg_to_json(struct jsonrpc_msg *m)
 /* A JSON-RPC session with reconnection. */
 
 struct jsonrpc_session {
+    struct svec remotes;
+    size_t next_remote;
+
     struct reconnect *reconnect;
     struct jsonrpc *rpc;
     struct stream *stream;
@@ -762,6 +767,13 @@  struct jsonrpc_session {
     uint8_t dscp;
 };
 
+static void
+jsonrpc_session_pick_remote(struct jsonrpc_session *s)
+{
+    reconnect_set_name(s->reconnect,
+                       s->remotes.names[s->next_remote++ % s->remotes.n]);
+}
+
 /* Creates and returns a jsonrpc_session to 'name', which should be a string
  * acceptable to stream_open() or pstream_open().
  *
@@ -779,12 +791,27 @@  struct jsonrpc_session {
 struct jsonrpc_session *
 jsonrpc_session_open(const char *name, bool retry)
 {
+    const struct svec remotes = { .names = (char **) &name, .n = 1 };
+    return jsonrpc_session_open_multiple(&remotes, retry);
+}
+
+struct jsonrpc_session *
+jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
+{
     struct jsonrpc_session *s;
 
     s = xmalloc(sizeof *s);
+
+    /* Set 'n' remotes from 'names', shuffling them into random order. */
+    ovs_assert(remotes->n > 0);
+    svec_clone(&s->remotes, remotes);
+    svec_shuffle(&s->remotes);
+    s->next_remote = 0;
+
     s->reconnect = reconnect_create(time_msec());
-    reconnect_set_name(s->reconnect, name);
+    jsonrpc_session_pick_remote(s);
     reconnect_enable(s->reconnect, time_msec());
+    reconnect_set_backoff_free_tries(s->reconnect, remotes->n);
     s->rpc = NULL;
     s->stream = NULL;
     s->pstream = NULL;
@@ -792,10 +819,11 @@  jsonrpc_session_open(const char *name, bool retry)
     s->dscp = 0;
     s->last_error = 0;
 
+    const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
         reconnect_set_passive(s->reconnect, true, time_msec());
     } else if (!retry) {
-        reconnect_set_max_tries(s->reconnect, 1);
+        reconnect_set_max_tries(s->reconnect, remotes->n);
         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
     }
 
@@ -817,6 +845,9 @@  jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     struct jsonrpc_session *s;
 
     s = xmalloc(sizeof *s);
+    svec_init(&s->remotes);
+    svec_add(&s->remotes, jsonrpc_get_name(jsonrpc));
+    s->next_remote = 0;
     s->reconnect = reconnect_create(time_msec());
     reconnect_set_quiet(s->reconnect, true);
     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
@@ -839,6 +870,7 @@  jsonrpc_session_close(struct jsonrpc_session *s)
         reconnect_destroy(s->reconnect);
         stream_close(s->stream);
         pstream_close(s->pstream);
+        svec_destroy(&s->remotes);
         free(s);
     }
 }
@@ -850,12 +882,15 @@  jsonrpc_session_disconnect(struct jsonrpc_session *s)
         jsonrpc_error(s->rpc, EOF);
         jsonrpc_close(s->rpc);
         s->rpc = NULL;
-        s->seqno++;
     } else if (s->stream) {
         stream_close(s->stream);
         s->stream = NULL;
-        s->seqno++;
+    } else {
+        return;
     }
+
+    s->seqno++;
+    jsonrpc_session_pick_remote(s);
 }
 
 static void
@@ -882,6 +917,7 @@  jsonrpc_session_connect(struct jsonrpc_session *s)
 
     if (error) {
         reconnect_connect_failed(s->reconnect, time_msec(), error);
+        jsonrpc_session_pick_remote(s);
     }
 }
 
@@ -946,6 +982,7 @@  jsonrpc_session_run(struct jsonrpc_session *s)
             s->seqno++;
         } else if (error != EAGAIN) {
             reconnect_connect_failed(s->reconnect, time_msec(), error);
+            jsonrpc_session_pick_remote(s);
             stream_close(s->stream);
             s->stream = NULL;
             s->last_error = error;
@@ -1016,6 +1053,12 @@  jsonrpc_session_get_id(const struct jsonrpc_session *s)
     }
 }
 
+size_t
+jsonrpc_session_get_n_remotes(const struct jsonrpc_session *s)
+{
+    return s->remotes.n;
+}
+
 /* Always takes ownership of 'msg', regardless of success. */
 int
 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 9b4fb0e51374..969a6ed38cd6 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2009, 2010, 2012, 2013 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2012, 2013, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@  struct jsonrpc_msg;
 struct pstream;
 struct reconnect_stats;
 struct stream;
+struct svec;
 
 /* API for a JSON-RPC stream. */
 
@@ -99,6 +100,8 @@  struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
 /* A JSON-RPC session with reconnection. */
 
 struct jsonrpc_session *jsonrpc_session_open(const char *name, bool retry);
+struct jsonrpc_session *jsonrpc_session_open_multiple(const struct svec *,
+                                                      bool retry);
 struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *,
                                                         uint8_t);
 void jsonrpc_session_close(struct jsonrpc_session *);
@@ -108,6 +111,7 @@  void jsonrpc_session_wait(struct jsonrpc_session *);
 
 size_t jsonrpc_session_get_backlog(const struct jsonrpc_session *);
 const char *jsonrpc_session_get_name(const struct jsonrpc_session *);
+size_t jsonrpc_session_get_n_remotes(const struct jsonrpc_session *);
 
 int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
 struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *);
diff --git a/lib/svec.c b/lib/svec.c
index 297a60ce14f9..c1b986bab108 100644
--- a/lib/svec.c
+++ b/lib/svec.c
@@ -20,6 +20,7 @@ 
 #include <stdlib.h>
 #include <string.h>
 #include "openvswitch/dynamic-string.h"
+#include "random.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
 
@@ -174,6 +175,23 @@  svec_compact(struct svec *svec)
     svec->n = j;
 }
 
+static void
+swap_strings(char **a, char **b)
+{
+    char *tmp = *a;
+    *a = *b;
+    *b = tmp;
+}
+
+void
+svec_shuffle(struct svec *svec)
+{
+    for (size_t i = 0; i < svec->n; i++) {
+        size_t j = i + random_range(svec->n - i);
+        swap_strings(&svec->names[i], &svec->names[j]);
+    }
+}
+
 void
 svec_diff(const struct svec *a, const struct svec *b,
           struct svec *a_only, struct svec *both, struct svec *b_only)
diff --git a/lib/svec.h b/lib/svec.h
index 341e26989801..b4e1343a9069 100644
--- a/lib/svec.h
+++ b/lib/svec.h
@@ -46,6 +46,7 @@  void svec_sort(struct svec *);
 void svec_sort_unique(struct svec *);
 void svec_unique(struct svec *);
 void svec_compact(struct svec *);
+void svec_shuffle(struct svec *);
 void svec_diff(const struct svec *a, const struct svec *b,
                struct svec *a_only, struct svec *both, struct svec *b_only);
 bool svec_contains(const struct svec *, const char *);