[ovs-dev,4/6] connmgr: Make treatment of active and passive connections more uniform.

Message ID 20181029225751.5936-4-blp@ovn.org
State New
Headers show
Series
  • [ovs-dev,1/6] connmgr: Modernize coding style.
Related show

Commit Message

Ben Pfaff Oct. 29, 2018, 10:57 p.m.
Until now, connmgr has handled active and passive OpenFlow connections in
quite different ways.  Any active connection, whether it was currently
connected or not, was always maintained as an ofconn.  Whenever such a
connection (re)connected, its settings were cleared.  On the other hand,
passive connections had a separate listener which created an ofconn when
a new connection came in, and these ofconns would be deleted when such a
connection was closed.  This approach is inelegant and has occasionally
led to bugs when reconnection didn't clear all of the state that it
should have.

There's another motivation here.  Currently, active connections are
always primary controllers and passive connections are always service
controllers (as documented in ovs-vswitchd.conf.db(5)).  Sometimes it would
be useful to have passive primary controllers (maybe active service
controllers too but I haven't personally run into that use case).  As is,
this is difficult to implement because there is so much different code in
use between active and passive connections.  This commit will make it
easier.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 ofproto/connmgr.c | 615 +++++++++++++++++++++++++++---------------------------
 vswitchd/bridge.c |   2 +-
 2 files changed, 310 insertions(+), 307 deletions(-)

Patch

diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index f5576d50524d..30d543d220e5 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -57,20 +57,17 @@  static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
  * connmgr.  'ofproto_mutex' doesn't protect the data inside the ofconn, except
  * as specifically noted below. */
 struct ofconn {
-/* Configuration that persists from one connection to the next. */
+    struct connmgr *connmgr;    /* Connection's manager. */
+    struct ovs_list connmgr_node; /* In connmgr->conns. */
 
-    struct ovs_list node;       /* In struct connmgr's "all_conns" list. */
-    struct hmap_node hmap_node; /* In struct connmgr's "controllers" map. */
+    struct ofservice *ofservice;    /* Connection's service. */
+    struct ovs_list ofservice_node; /* In service->conns. */
 
-    struct connmgr *connmgr;    /* Connection's manager. */
     struct rconn *rconn;        /* OpenFlow connection. */
     enum ofconn_type type;      /* Type. */
     enum ofproto_band band;     /* In-band or out-of-band? */
-    bool enable_async_msgs;     /* Initially enable async messages? */
     bool want_packet_in_on_miss;
 
-/* State that should be cleared from one connection to the next. */
-
     /* OpenFlow state. */
     enum ofp12_controller_role role;           /* Role. */
     enum ofputil_protocol protocol; /* Current protocol variant. */
@@ -145,11 +142,10 @@  struct ofconn {
 
 static unsigned int bundle_idle_timeout = BUNDLE_IDLE_TIMEOUT_DEFAULT;
 
-static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
-                                    enum ofconn_type, bool enable_async_msgs)
-    OVS_REQUIRES(ofproto_mutex);
+static void ofconn_create(struct ofservice *, struct rconn *, enum ofconn_type,
+                          const struct ofproto_controller *settings)
+    OVS_EXCLUDED(ofproto_mutex);
 static void ofconn_destroy(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
-static void ofconn_flush(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
 
 static void ofconn_reconfigure(struct ofconn *,
                                const struct ofproto_controller *);
@@ -161,7 +157,6 @@  static void ofconn_wait(struct ofconn *);
 
 static void ofconn_log_flow_mods(struct ofconn *);
 
-static const char *ofconn_get_target(const struct ofconn *);
 static char *ofconn_make_name(const struct connmgr *, const char *target);
 
 static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
@@ -171,27 +166,33 @@  static void ofconn_send(const struct ofconn *, struct ofpbuf *,
 
 static void do_send_packet_ins(struct ofconn *, struct ovs_list *txq);
 
-/* A listener for incoming OpenFlow "service" connections. */
+/* A listener for incoming OpenFlow connections or for establishing an
+ * outgoing connection. */
 struct ofservice {
-    struct hmap_node node;      /* In struct connmgr's "services" hmap. */
-    struct pvconn *pvconn;      /* OpenFlow connection listener. */
-
-    /* These are not used by ofservice directly.  They are settings for
-     * accepted "struct ofconn"s from the pvconn. */
-    int probe_interval;         /* Max idle time before probing, in seconds. */
-    int rate_limit;             /* Max packet-in rate in packets per second. */
-    int burst_limit;            /* Limit on accumulating packet credits. */
-    bool enable_async_msgs;     /* Initially enable async messages? */
-    uint8_t dscp;               /* DSCP Value for controller connection */
-    uint32_t allowed_versions;  /* OpenFlow protocol versions that may
-                                 * be negotiated for a session. */
+    struct hmap_node hmap_node; /* In connmgr->services, by target. */
+    struct connmgr *connmgr;
+
+    char *target;               /* e.g. "tcp:..." or "pssl:...". */
+    struct ovs_list conns;      /* "ofconn"s generated by this service. */
+    enum ofconn_type type;      /* OFCONN_PRIMARY or OFCONN_SERVICE. */
+
+    /* Source of connections. */
+    struct rconn *rconn;        /* Active connection only. */
+    struct pvconn *pvconn;      /* Passive listener only. */
+
+    /* Settings for "struct ofconn"s established by this service. */
+    struct ofproto_controller s;
 };
 
+static void ofservice_run(struct ofservice *);
+static void ofservice_wait(struct ofservice *);
 static void ofservice_reconfigure(struct ofservice *,
-                                  const struct ofproto_controller *);
-static int ofservice_create(struct connmgr *mgr, const char *target,
-                            uint32_t allowed_versions, uint8_t dscp);
-static void ofservice_destroy(struct connmgr *, struct ofservice *);
+                                  const struct ofproto_controller *)
+    OVS_REQUIRES(ofproto_mutex);
+static void ofservice_create(struct connmgr *mgr, const char *target,
+                             const struct ofproto_controller *)
+    OVS_REQUIRES(ofproto_mutex);
+static void ofservice_destroy(struct ofservice *) OVS_REQUIRES(ofproto_mutex);
 static struct ofservice *ofservice_lookup(struct connmgr *,
                                           const char *target);
 
@@ -201,17 +202,17 @@  struct connmgr {
     char *name;
     char *local_port_name;
 
-    /* OpenFlow connections. */
-    struct hmap controllers;     /* All OFCONN_PRIMARY controllers. */
-    struct ovs_list all_conns;   /* All controllers.  All modifications are
-                                    protected by ofproto_mutex, so that any
-                                    traversals from other threads can be made
-                                    safe by holding the ofproto_mutex. */
+    /* OpenFlow connections.
+     *
+     * All modifications to 'conns' protected by ofproto_mutex, so that any
+     * traversals from other threads can be made safe by holding the
+     * ofproto_mutex.*/
+    struct ovs_list conns;       /* All ofconns. */
     uint64_t master_election_id; /* monotonically increasing sequence number
                                   * for master election */
     bool master_election_id_defined;
 
-    /* OpenFlow listeners. */
+    /* OpenFlow connection establishment. */
     struct hmap services;       /* Contains "struct ofservice"s. */
     struct pvconn **snoops;
     size_t n_snoops;
@@ -247,8 +248,7 @@  connmgr_create(struct ofproto *ofproto,
     mgr->name = xstrdup(name);
     mgr->local_port_name = xstrdup(local_port_name);
 
-    hmap_init(&mgr->controllers);
-    ovs_list_init(&mgr->all_conns);
+    ovs_list_init(&mgr->conns);
     mgr->master_election_id = 0;
     mgr->master_election_id_defined = false;
 
@@ -306,18 +306,12 @@  connmgr_destroy(struct connmgr *mgr)
         return;
     }
 
-    struct ofconn *ofconn, *next_ofconn;
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
-        ofconn_destroy(ofconn);
-    }
-
-    hmap_destroy(&mgr->controllers);
-
     struct ofservice *ofservice, *next_ofservice;
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
-        ofservice_destroy(mgr, ofservice);
+    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, hmap_node, &mgr->services) {
+        ofservice_destroy(ofservice);
     }
     hmap_destroy(&mgr->services);
+    ovs_assert(ovs_list_is_empty(&mgr->conns));
 
     for (size_t i = 0; i < mgr->n_snoops; i++) {
         pvconn_close(mgr->snoops[i]);
@@ -354,7 +348,7 @@  connmgr_run(struct connmgr *mgr,
     }
 
     struct ofconn *ofconn, *next_ofconn;
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, connmgr_node, &mgr->conns) {
         ofconn_run(ofconn, handle_openflow);
     }
     ofmonitor_run(mgr);
@@ -366,28 +360,8 @@  connmgr_run(struct connmgr *mgr,
     }
 
     struct ofservice *ofservice;
-    HMAP_FOR_EACH (ofservice, node, &mgr->services) {
-        struct vconn *vconn;
-        int retval = pvconn_accept(ofservice->pvconn, &vconn);
-        if (!retval) {
-            /* Passing default value for creation of the rconn */
-            struct rconn *rconn = rconn_create(
-                ofservice->probe_interval, 0, ofservice->dscp,
-                vconn_get_allowed_versions(vconn));
-            char *name = ofconn_make_name(mgr, vconn_get_name(vconn));
-            rconn_connect_unreliably(rconn, vconn, name);
-            free(name);
-
-            ovs_mutex_lock(&ofproto_mutex);
-            ofconn = ofconn_create(mgr, rconn, OFCONN_SERVICE,
-                                   ofservice->enable_async_msgs);
-            ovs_mutex_unlock(&ofproto_mutex);
-
-            ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
-                                  ofservice->burst_limit);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
-        }
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        ofservice_run(ofservice);
     }
 
     for (size_t i = 0; i < mgr->n_snoops; i++) {
@@ -406,7 +380,7 @@  void
 connmgr_wait(struct connmgr *mgr)
 {
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         ofconn_wait(ofconn);
     }
 
@@ -421,8 +395,8 @@  connmgr_wait(struct connmgr *mgr)
     }
 
     struct ofservice *ofservice;
-    HMAP_FOR_EACH (ofservice, node, &mgr->services) {
-        pvconn_wait(ofservice->pvconn);
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        ofservice_wait(ofservice);
     }
 
     for (size_t i = 0; i < mgr->n_snoops; i++) {
@@ -438,8 +412,8 @@  connmgr_get_memory_usage(const struct connmgr *mgr, struct simap *usage)
     unsigned int packets = 0;
     unsigned int ofconns = 0;
 
-    const struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    struct ofconn *ofconn;
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         ofconns++;
 
         packets += rconn_count_txqlen(ofconn->rconn);
@@ -475,11 +449,6 @@  connmgr_set_bundle_idle_timeout(unsigned timeout)
 
 /* OpenFlow configuration. */
 
-static void add_controller(struct connmgr *, const char *target, uint8_t dscp,
-                           uint32_t allowed_versions)
-    OVS_REQUIRES(ofproto_mutex);
-static struct ofconn *find_controller_by_target(struct connmgr *,
-                                                const char *target);
 static void update_fail_open(struct connmgr *) OVS_EXCLUDED(ofproto_mutex);
 static int set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
                        const struct sset *);
@@ -491,7 +460,22 @@  static int set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
 bool
 connmgr_has_controllers(const struct connmgr *mgr)
 {
-    return !hmap_is_empty(&mgr->controllers);
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        if (ofservice->type == OFCONN_PRIMARY) {
+            return true;
+        }
+    }
+    return false;
+}
+
+static struct ofconn *
+ofservice_first_conn(const struct ofservice *ofservice)
+{
+    return (ovs_list_is_empty(&ofservice->conns)
+            ? NULL
+            : CONTAINER_OF(ofservice->conns.next,
+                           struct ofconn, ofservice_node));
 }
 
 /* Initializes 'info' and populates it with information about each configured
@@ -503,13 +487,16 @@  connmgr_has_controllers(const struct connmgr *mgr)
 void
 connmgr_get_controller_info(struct connmgr *mgr, struct shash *info)
 {
-    const struct ofconn *ofconn;
-
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        const struct rconn *rconn = ofconn->rconn;
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        const struct rconn *rconn = ofservice->rconn;
+        if (!rconn) {
+            continue;
+        }
         const char *target = rconn_get_target(rconn);
 
         if (!shash_find(info, target)) {
+            struct ofconn *ofconn = ofservice_first_conn(ofservice);
             struct ofproto_controller_info *cinfo = xmalloc(sizeof *cinfo);
             time_t now = time_now();
             time_t last_connection = rconn_get_last_connection(rconn);
@@ -520,7 +507,7 @@  connmgr_get_controller_info(struct connmgr *mgr, struct shash *info)
             shash_add(info, target, cinfo);
 
             cinfo->is_connected = rconn_is_connected(rconn);
-            cinfo->role = ofconn->role;
+            cinfo->role = ofconn ? ofconn->role : OFPCR12_ROLE_NOCHANGE;
 
             smap_init(&cinfo->pairs);
             if (last_error) {
@@ -541,7 +528,7 @@  connmgr_get_controller_info(struct connmgr *mgr, struct shash *info)
             }
 
             for (i = 0; i < N_SCHEDULERS; i++) {
-                if (ofconn->schedulers[i]) {
+                if (ofconn && ofconn->schedulers[i]) {
                     const char *name = i ? "miss" : "action";
                     struct pinsched_stats stats;
 
@@ -589,76 +576,27 @@  connmgr_set_controllers(struct connmgr *mgr, struct shash *controllers)
      * cover a smaller amount of code, if that yielded some benefit. */
     ovs_mutex_lock(&ofproto_mutex);
 
-    /* Create newly configured controllers and services. */
+    /* Create newly configured services. */
     struct shash_node *node;
     SHASH_FOR_EACH (node, controllers) {
         const char *target = node->name;
         const struct ofproto_controller *c = node->data;
-
-        if (!vconn_verify_name(target)) {
-            bool add = false;
-            struct ofconn *ofconn = find_controller_by_target(mgr, target);
-            if (!ofconn) {
-                VLOG_INFO("%s: added primary controller \"%s\"",
-                          mgr->name, target);
-                add = true;
-            } else if (rconn_get_allowed_versions(ofconn->rconn) !=
-                       c->allowed_versions) {
-                VLOG_INFO("%s: re-added primary controller \"%s\"",
-                          mgr->name, target);
-                add = true;
-                ofconn_destroy(ofconn);
-            }
-            if (add) {
-                add_controller(mgr, target, c->dscp, c->allowed_versions);
-            }
-        } else if (!pvconn_verify_name(target)) {
-            bool add = false;
-            struct ofservice *ofservice = ofservice_lookup(mgr, target);
-            if (!ofservice) {
-                VLOG_INFO("%s: added service controller \"%s\"",
-                          mgr->name, target);
-                add = true;
-            } else if (ofservice->allowed_versions != c->allowed_versions) {
-                VLOG_INFO("%s: re-added service controller \"%s\"",
-                          mgr->name, target);
-                ofservice_destroy(mgr, ofservice);
-                add = true;
-            }
-            if (add) {
-                ofservice_create(mgr, target, c->allowed_versions, c->dscp);
-            }
-        } else {
-            VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
-                         mgr->name, target);
-        }
-    }
-
-    /* Delete controllers that are no longer configured.
-     * Update configuration of all now-existing controllers. */
-    struct ofconn *ofconn, *next_ofconn;
-    HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &mgr->controllers) {
-        const char *target = ofconn_get_target(ofconn);
-        struct ofproto_controller *c = shash_find_data(controllers, target);
-        if (!c) {
-            VLOG_INFO("%s: removed primary controller \"%s\"",
-                      mgr->name, target);
-            ofconn_destroy(ofconn);
-        } else {
-            ofconn_reconfigure(ofconn, c);
+        if (!ofservice_lookup(mgr, target)) {
+            ofservice_create(mgr, target, c);
         }
     }
 
     /* Delete services that are no longer configured.
      * Update configuration of all now-existing services. */
     struct ofservice *ofservice, *next_ofservice;
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
-        const char *target = pvconn_get_name(ofservice->pvconn);
+    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, hmap_node, &mgr->services) {
+        const char *target = ofservice->target;
         struct ofproto_controller *c = shash_find_data(controllers, target);
         if (!c) {
-            VLOG_INFO("%s: removed service controller \"%s\"",
-                      mgr->name, target);
-            ofservice_destroy(mgr, ofservice);
+            VLOG_INFO("%s: removed %s controller \"%s\"",
+                      mgr->name, ofconn_type_to_string(ofservice->type),
+                      target);
+            ofservice_destroy(ofservice);
         } else {
             ofservice_reconfigure(ofservice, c);
         }
@@ -680,7 +618,7 @@  connmgr_reconnect(const struct connmgr *mgr)
 {
     struct ofconn *ofconn;
 
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         rconn_reconnect(ofconn->rconn);
     }
 }
@@ -711,55 +649,24 @@  connmgr_has_snoops(const struct connmgr *mgr)
     return mgr->n_snoops > 0;
 }
 
-/* Creates a new controller for 'target' in 'mgr'.  update_controller() needs
- * to be called later to finish the new ofconn's configuration. */
-static void
-add_controller(struct connmgr *mgr, const char *target, uint8_t dscp,
-               uint32_t allowed_versions)
-    OVS_REQUIRES(ofproto_mutex)
-{
-    char *name = ofconn_make_name(mgr, target);
-    struct ofconn *ofconn = ofconn_create(mgr, rconn_create(5, 8, dscp,
-                                                            allowed_versions),
-                                          OFCONN_PRIMARY, true);
-    rconn_connect(ofconn->rconn, target, name);
-    hmap_insert(&mgr->controllers, &ofconn->hmap_node, hash_string(target, 0));
-
-    free(name);
-}
-
-static struct ofconn *
-find_controller_by_target(struct connmgr *mgr, const char *target)
-{
-    struct ofconn *ofconn;
-
-    HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
-                             hash_string(target, 0), &mgr->controllers) {
-        if (!strcmp(ofconn_get_target(ofconn), target)) {
-            return ofconn;
-        }
-    }
-    return NULL;
-}
-
 static void
 update_in_band_remotes(struct connmgr *mgr)
 {
     /* Allocate enough memory for as many remotes as we could possibly have. */
-    size_t max_addrs = mgr->n_extra_remotes + hmap_count(&mgr->controllers);
+    size_t max_addrs = mgr->n_extra_remotes + hmap_count(&mgr->services);
     struct sockaddr_in *addrs = xmalloc(max_addrs * sizeof *addrs);
     size_t n_addrs = 0;
 
     /* Add all the remotes. */
-    struct ofconn *ofconn;
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        const char *target = rconn_get_target(ofconn->rconn);
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        const char *target = ofservice->target;
         union {
             struct sockaddr_storage ss;
             struct sockaddr_in in;
         } sa;
 
-        if (ofconn->band == OFPROTO_IN_BAND
+        if (ofservice->s.band == OFPROTO_IN_BAND
             && stream_parse_target_with_default_port(target, OFP_PORT, &sa.ss)
             && sa.ss.ss_family == AF_INET) {
             addrs[n_addrs++] = sa.in;
@@ -846,8 +753,13 @@  set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
  * means that 'ofconn' is more interesting for monitoring than a lower return
  * value. */
 static int
-snoop_preference(const struct ofconn *ofconn)
+snoop_preference(const struct ofservice *ofservice)
 {
+    struct ofconn *ofconn = ofservice_first_conn(ofservice);
+    if (!ofconn) {
+        return 0;
+    }
+
     switch (ofconn->role) {
     case OFPCR12_ROLE_MASTER:
         return 3;
@@ -868,12 +780,12 @@  static void
 add_snooper(struct connmgr *mgr, struct vconn *vconn)
 {
     /* Pick a controller for monitoring. */
-    struct ofconn *best = NULL;
-    struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
-        if (ofconn->type == OFCONN_PRIMARY
-            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
-            best = ofconn;
+    struct ofservice *best = NULL;
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        if (ofservice->rconn &&
+            (!best || snoop_preference(ofservice) > snoop_preference(best))) {
+            best = ofservice;
         }
     }
 
@@ -957,7 +869,7 @@  ofconn_set_role(struct ofconn *ofconn, enum ofp12_controller_role role)
     if (role != ofconn->role && role == OFPCR12_ROLE_MASTER) {
         struct ofconn *other;
 
-        LIST_FOR_EACH (other, node, &ofconn->connmgr->all_conns) {
+        LIST_FOR_EACH (other, connmgr_node, &ofconn->connmgr->conns) {
             if (other->role == OFPCR12_ROLE_MASTER) {
                 other->role = OFPCR12_ROLE_SLAVE;
                 ofconn_send_role_status(other, OFPCR12_ROLE_SLAVE,
@@ -1095,7 +1007,7 @@  ofconn_get_async_config(const struct ofconn *ofconn)
     }
 
     int version = rconn_get_version(ofconn->rconn);
-    return (version < 0 || !ofconn->enable_async_msgs
+    return (version < 0 || !ofconn->ofservice->s.enable_async_msgs
             ? OFPUTIL_ASYNC_CFG_INIT
             : ofputil_async_cfg_default(version));
 }
@@ -1236,67 +1148,37 @@  bundle_remove_expired(struct ofconn *ofconn, long long int now)
 
 /* Private ofconn functions. */
 
-static const char *
-ofconn_get_target(const struct ofconn *ofconn)
+static void
+ofconn_create(struct ofservice *ofservice, struct rconn *rconn,
+              enum ofconn_type type, const struct ofproto_controller *settings)
+    OVS_EXCLUDED(ofproto_mutex)
 {
-    return rconn_get_target(ofconn->rconn);
-}
+    ovs_mutex_lock(&ofproto_mutex);
 
-static struct ofconn *
-ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
-              bool enable_async_msgs)
-    OVS_REQUIRES(ofproto_mutex)
-{
     struct ofconn *ofconn = xzalloc(sizeof *ofconn);
-    ofconn->connmgr = mgr;
-    ovs_list_push_back(&mgr->all_conns, &ofconn->node);
-    ofconn->rconn = rconn;
-    ofconn->type = type;
-    ofconn->enable_async_msgs = enable_async_msgs;
-
-    hmap_init(&ofconn->monitors);
-    ovs_list_init(&ofconn->updates);
 
-    hmap_init(&ofconn->bundles);
-    ofconn->next_bundle_expiry_check = time_msec() + BUNDLE_EXPIRY_INTERVAL;
+    ofconn->connmgr = ofservice->connmgr;
+    ovs_list_push_back(&ofservice->connmgr->conns, &ofconn->connmgr_node);
 
-    ofconn_flush(ofconn);
+    ofconn->ofservice = ofservice;
+    ovs_list_push_back(&ofservice->conns, &ofconn->ofservice_node);
 
-    return ofconn;
-}
-
-/* Clears all of the state in 'ofconn' that should not persist from one
- * connection to the next. */
-static void
-ofconn_flush(struct ofconn *ofconn)
-    OVS_REQUIRES(ofproto_mutex)
-{
-    ofconn_log_flow_mods(ofconn);
+    ofconn->rconn = rconn;
+    ofconn->type = type;
+    ofconn->band = settings->band;
 
     ofconn->role = OFPCR12_ROLE_EQUAL;
     ofconn_set_protocol(ofconn, OFPUTIL_P_NONE);
     ofconn->packet_in_format = OFPUTIL_PACKET_IN_STD;
 
-    rconn_packet_counter_destroy(ofconn->packet_in_counter);
     ofconn->packet_in_counter = rconn_packet_counter_create();
-    for (int i = 0; i < N_SCHEDULERS; i++) {
-        if (ofconn->schedulers[i]) {
-            int rate, burst;
-
-            pinsched_get_limits(ofconn->schedulers[i], &rate, &burst);
-            pinsched_destroy(ofconn->schedulers[i]);
-            ofconn->schedulers[i] = pinsched_create(rate, burst);
-        }
-    }
     ofconn->miss_send_len = (ofconn->type == OFCONN_PRIMARY
                              ? OFP_DEFAULT_MISS_SEND_LEN
                              : 0);
     ofconn->controller_id = 0;
 
-    rconn_packet_counter_destroy(ofconn->reply_counter);
     ofconn->reply_counter = rconn_packet_counter_create();
 
-    free(ofconn->async_cfg);
     ofconn->async_cfg = NULL;
 
     ofconn->n_add = ofconn->n_delete = ofconn->n_modify = 0;
@@ -1304,59 +1186,81 @@  ofconn_flush(struct ofconn *ofconn)
     ofconn->next_op_report = LLONG_MAX;
     ofconn->op_backoff = LLONG_MIN;
 
-    struct ofmonitor *monitor, *next_monitor;
-    HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node,
-                        &ofconn->monitors) {
-        ofmonitor_destroy(monitor);
-    }
-    rconn_packet_counter_destroy(ofconn->monitor_counter);
+    hmap_init(&ofconn->monitors);
     ofconn->monitor_counter = rconn_packet_counter_create();
-    ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
+
+    ovs_list_init(&ofconn->updates);
+
+    hmap_init(&ofconn->bundles);
+    ofconn->next_bundle_expiry_check = time_msec() + BUNDLE_EXPIRY_INTERVAL;
+
+    ofconn_set_rate_limit(ofconn, settings->rate_limit, settings->burst_limit);
+
+    ovs_mutex_unlock(&ofproto_mutex);
 }
 
 static void
 ofconn_destroy(struct ofconn *ofconn)
     OVS_REQUIRES(ofproto_mutex)
 {
-    ofconn_flush(ofconn);
+    if (!ofconn) {
+        return;
+    }
+
+    ofconn_log_flow_mods(ofconn);
+
+    ovs_list_remove(&ofconn->connmgr_node);
+    ovs_list_remove(&ofconn->ofservice_node);
+
+    if (ofconn->rconn != ofconn->ofservice->rconn) {
+        rconn_destroy(ofconn->rconn);
+    }
 
     /* Force clearing of want_packet_in_on_miss to keep the global count
      * accurate. */
     ofconn->controller_id = 1;
     update_want_packet_in_on_miss(ofconn);
 
-    if (ofconn->type == OFCONN_PRIMARY) {
-        hmap_remove(&ofconn->connmgr->controllers, &ofconn->hmap_node);
+    rconn_packet_counter_destroy(ofconn->packet_in_counter);
+    for (int i = 0; i < N_SCHEDULERS; i++) {
+        if (ofconn->schedulers[i]) {
+            pinsched_destroy(ofconn->schedulers[i]);
+        }
     }
 
-    bundle_remove_all(ofconn);
-    hmap_destroy(&ofconn->bundles);
+    rconn_packet_counter_destroy(ofconn->reply_counter);
+
+    free(ofconn->async_cfg);
 
+    struct ofmonitor *monitor, *next_monitor;
+    HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node,
+                        &ofconn->monitors) {
+        ofmonitor_destroy(monitor);
+    }
     hmap_destroy(&ofconn->monitors);
-    ovs_list_remove(&ofconn->node);
-    rconn_destroy(ofconn->rconn);
-    rconn_packet_counter_destroy(ofconn->packet_in_counter);
-    rconn_packet_counter_destroy(ofconn->reply_counter);
     rconn_packet_counter_destroy(ofconn->monitor_counter);
+
+    ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
+
+    bundle_remove_all(ofconn);
+    hmap_destroy(&ofconn->bundles);
+
     free(ofconn);
 }
 
-/* Reconfigures 'ofconn' to match 'c'.  'ofconn' and 'c' must have the same
- * target. */
+/* Reconfigures 'ofconn' to match 'c'. */
 static void
 ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c)
 {
-    ofconn->band = c->band;
-    ofconn->enable_async_msgs = c->enable_async_msgs;
-
     rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
 
     int probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
     rconn_set_probe_interval(ofconn->rconn, probe_interval);
 
+    ofconn->band = c->band;
+
     ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
 
-    /* If dscp value changed reconnect. */
     if (c->dscp != rconn_get_dscp(ofconn->rconn)) {
         rconn_set_dscp(ofconn->rconn, c->dscp);
         rconn_reconnect(ofconn->rconn);
@@ -1415,10 +1319,10 @@  ofconn_run(struct ofconn *ofconn,
     }
 
     ovs_mutex_lock(&ofproto_mutex);
-    if (!rconn_is_alive(ofconn->rconn)) {
+    if (rconn_is_reliable(ofconn->rconn)
+        ? !rconn_is_connected(ofconn->rconn)
+        : !rconn_is_alive(ofconn->rconn)) {
         ofconn_destroy(ofconn);
-    } else if (!rconn_is_connected(ofconn->rconn)) {
-        ofconn_flush(ofconn);
     }
     ovs_mutex_unlock(&ofproto_mutex);
 }
@@ -1585,7 +1489,7 @@  connmgr_send_port_status(struct connmgr *mgr, struct ofconn *source,
     struct ofputil_port_status new_ps = { reason, *new_pp };
 
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofconn_receives_async_msg(ofconn, OAM_PORT_STATUS, reason)) {
             /* Before 1.5, OpenFlow specified that OFPT_PORT_MOD should not
              * generate OFPT_PORT_STATUS messages.  That requirement was a
@@ -1642,7 +1546,7 @@  connmgr_send_requestforward(struct connmgr *mgr, const struct ofconn *source,
 {
     struct ofconn *ofconn;
 
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         /* METER_MOD only supported in OF13 and up. */
         if (rf->reason == OFPRFR_METER_MOD
             && rconn_get_version(ofconn->rconn) < OFP13_VERSION) {
@@ -1669,7 +1573,7 @@  connmgr_send_flow_removed(struct connmgr *mgr,
 {
     struct ofconn *ofconn;
 
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofconn_receives_async_msg(ofconn, OAM_FLOW_REMOVED, fr->reason)) {
             /* Account flow expirations as replies to OpenFlow requests.  That
              * works because preventing OpenFlow requests from being processed
@@ -1701,7 +1605,7 @@  connmgr_send_table_status(struct connmgr *mgr,
     };
 
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofconn_receives_async_msg(ofconn, OAM_TABLE_STATUS, reason)) {
             struct ofpbuf *msg;
 
@@ -1722,7 +1626,7 @@  connmgr_send_async_msg(struct connmgr *mgr,
 {
     struct ofconn *ofconn;
 
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         enum ofputil_protocol protocol = ofconn_get_protocol(ofconn);
         if (protocol == OFPUTIL_P_NONE || !rconn_is_connected(ofconn->rconn)
             || ofconn->controller_id != am->controller_id
@@ -1794,10 +1698,12 @@  connmgr_get_max_probe_interval(const struct connmgr *mgr)
 {
     int max_probe_interval = 0;
 
-    const struct ofconn *ofconn;
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        int probe_interval = rconn_get_probe_interval(ofconn->rconn);
-        max_probe_interval = MAX(max_probe_interval, probe_interval);
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        if (ofservice->type == OFCONN_PRIMARY) {
+            int probe_interval = ofservice->s.probe_interval;
+            max_probe_interval = MAX(max_probe_interval, probe_interval);
+        }
     }
     return max_probe_interval;
 }
@@ -1807,18 +1713,17 @@  connmgr_get_max_probe_interval(const struct connmgr *mgr)
 int
 connmgr_failure_duration(const struct connmgr *mgr)
 {
-    if (!connmgr_has_controllers(mgr)) {
-        return 0;
-    }
-
     int min_failure_duration = INT_MAX;
 
-    const struct ofconn *ofconn;
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        int failure_duration = rconn_failure_duration(ofconn->rconn);
-        min_failure_duration = MIN(min_failure_duration, failure_duration);
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        if (ofservice->rconn) {
+            int failure_duration = rconn_failure_duration(ofservice->rconn);
+            min_failure_duration = MIN(min_failure_duration, failure_duration);
+        }
     }
-    return min_failure_duration;
+
+    return min_failure_duration != INT_MAX ? min_failure_duration : 0;
 }
 
 /* Returns true if at least one primary controller is connected (regardless of
@@ -1827,10 +1732,9 @@  connmgr_failure_duration(const struct connmgr *mgr)
 bool
 connmgr_is_any_controller_connected(const struct connmgr *mgr)
 {
-    const struct ofconn *ofconn;
-
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        if (rconn_is_connected(ofconn->rconn)) {
+    struct ofservice *ofservice;
+    HMAP_FOR_EACH (ofservice, hmap_node, &mgr->services) {
+        if (ofservice->rconn && rconn_is_connected(ofservice->rconn)) {
             return true;
         }
     }
@@ -1844,8 +1748,9 @@  connmgr_is_any_controller_admitted(const struct connmgr *mgr)
 {
     const struct ofconn *ofconn;
 
-    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
-        if (rconn_is_admitted(ofconn->rconn)) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
+        if (ofconn->type == OFCONN_PRIMARY
+            && rconn_is_admitted(ofconn->rconn)) {
             return true;
         }
     }
@@ -1972,41 +1877,133 @@  connmgr_count_hidden_rules(const struct connmgr *mgr)
  *
  * ofservice_reconfigure() must be called to fully configure the new
  * ofservice. */
-static int
+static void
 ofservice_create(struct connmgr *mgr, const char *target,
-                 uint32_t allowed_versions, uint8_t dscp)
+                 const struct ofproto_controller *c)
+    OVS_REQUIRES(ofproto_mutex)
 {
-    struct pvconn *pvconn;
-    int error = pvconn_open(target, allowed_versions, dscp, &pvconn);
-    if (error) {
-        return error;
+    struct pvconn *pvconn = NULL;
+    struct rconn *rconn = NULL;
+    if (!vconn_verify_name(target)) {
+        char *name = ofconn_make_name(mgr, target);
+        rconn = rconn_create(5, 8, c->dscp, c->allowed_versions);
+        rconn_connect(rconn, target, name);
+        free(name);
+    } else if (!pvconn_verify_name(target)) {
+        int error = pvconn_open(target, c->allowed_versions, c->dscp, &pvconn);
+        if (error) {
+            return;
+        }
+    } else {
+        VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
+                     mgr->name, target);
+        return;
     }
 
     struct ofservice *ofservice = xzalloc(sizeof *ofservice);
-    hmap_insert(&mgr->services, &ofservice->node, hash_string(target, 0));
+    hmap_insert(&mgr->services, &ofservice->hmap_node, hash_string(target, 0));
+    ofservice->connmgr = mgr;
+    ofservice->target = xstrdup(target);
+    ovs_list_init(&ofservice->conns);
+    ofservice->type = rconn ? OFCONN_PRIMARY : OFCONN_SERVICE;
+    ofservice->rconn = rconn;
     ofservice->pvconn = pvconn;
-    ofservice->allowed_versions = allowed_versions;
+    ofservice->s = *c;
+    ofservice_reconfigure(ofservice, c);
 
-    return 0;
+    VLOG_INFO("%s: added %s controller \"%s\"",
+              mgr->name, ofconn_type_to_string(ofservice->type), target);
 }
 
 static void
-ofservice_destroy(struct connmgr *mgr, struct ofservice *ofservice)
+ofservice_close_all(struct ofservice *ofservice)
+    OVS_REQUIRES(ofproto_mutex)
 {
-    hmap_remove(&mgr->services, &ofservice->node);
-    pvconn_close(ofservice->pvconn);
+    struct ofconn *ofconn, *next;
+    LIST_FOR_EACH_SAFE (ofconn, next, ofservice_node, &ofservice->conns) {
+        ofconn_destroy(ofconn);
+    }
+}
+
+static void
+ofservice_destroy(struct ofservice *ofservice)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    if (!ofservice) {
+        return;
+    }
+
+    ofservice_close_all(ofservice);
+
+    hmap_remove(&ofservice->connmgr->services, &ofservice->hmap_node);
+    free(ofservice->target);
+    if (ofservice->pvconn) {
+        pvconn_close(ofservice->pvconn);
+    }
+    if (ofservice->rconn) {
+        rconn_destroy(ofservice->rconn);
+    }
     free(ofservice);
 }
 
+static void
+ofservice_run(struct ofservice *ofservice)
+{
+    if (ofservice->pvconn) {
+        struct vconn *vconn;
+        int retval = pvconn_accept(ofservice->pvconn, &vconn);
+        if (!retval) {
+            /* Passing default value for creation of the rconn */
+            struct rconn *rconn = rconn_create(
+                ofservice->s.probe_interval, ofservice->s.max_backoff,
+                ofservice->s.dscp, ofservice->s.allowed_versions);
+            char *name = ofconn_make_name(ofservice->connmgr,
+                                          vconn_get_name(vconn));
+            rconn_connect_unreliably(rconn, vconn, name);
+            free(name);
+
+            ofconn_create(ofservice, rconn, OFCONN_SERVICE, &ofservice->s);
+        } else if (retval != EAGAIN) {
+            VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
+        }
+    } else {
+        rconn_run(ofservice->rconn);
+
+        bool connected = rconn_is_connected(ofservice->rconn);
+        bool has_ofconn = !ovs_list_is_empty(&ofservice->conns);
+        if (connected && !has_ofconn) {
+            ofconn_create(ofservice, ofservice->rconn, OFCONN_PRIMARY,
+                          &ofservice->s);
+        }
+    }
+}
+
+static void
+ofservice_wait(struct ofservice *ofservice)
+{
+    if (ofservice->pvconn) {
+        pvconn_wait(ofservice->pvconn);
+    }
+}
+
 static void
 ofservice_reconfigure(struct ofservice *ofservice,
-                      const struct ofproto_controller *c)
+                      const struct ofproto_controller *settings)
+    OVS_REQUIRES(ofproto_mutex)
 {
-    ofservice->probe_interval = c->probe_interval;
-    ofservice->rate_limit = c->rate_limit;
-    ofservice->burst_limit = c->burst_limit;
-    ofservice->enable_async_msgs = c->enable_async_msgs;
-    ofservice->dscp = c->dscp;
+    /* If the allowed OpenFlow versions change, close all of the existing
+     * connections to allow them to reconnect and possibly negotiate a new
+     * version. */
+    if (ofservice->s.allowed_versions != settings->allowed_versions) {
+        ofservice_close_all(ofservice);
+    }
+
+    ofservice->s = *settings;
+
+    struct ofconn *ofconn;
+    LIST_FOR_EACH (ofconn, ofservice_node, &ofservice->conns) {
+        ofconn_reconfigure(ofconn, settings);
+    }
 }
 
 /* Finds and returns the ofservice within 'mgr' that has the given
@@ -2016,9 +2013,9 @@  ofservice_lookup(struct connmgr *mgr, const char *target)
 {
     struct ofservice *ofservice;
 
-    HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
+    HMAP_FOR_EACH_WITH_HASH (ofservice, hmap_node, hash_string(target, 0),
                              &mgr->services) {
-        if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
+        if (!strcmp(ofservice->target, target)) {
             return ofservice;
         }
     }
@@ -2135,7 +2132,7 @@  ofmonitor_report(struct connmgr *mgr, struct rule *rule,
     }
 
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofconn->monitor_paused) {
             /* Only send NXFME_DELETED notifications for flows that were added
              * before we paused. */
@@ -2213,7 +2210,7 @@  ofmonitor_flush(struct connmgr *mgr)
 {
     struct ofconn *ofconn;
 
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         struct rconn_packet_counter *counter = ofconn->monitor_counter;
 
         struct ofpbuf *msg;
@@ -2268,7 +2265,7 @@  ofmonitor_run(struct connmgr *mgr)
 {
     ovs_mutex_lock(&ofproto_mutex);
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofmonitor_may_resume(ofconn)) {
             COVERAGE_INC(ofmonitor_resume);
             ofmonitor_resume(ofconn);
@@ -2282,7 +2279,7 @@  ofmonitor_wait(struct connmgr *mgr)
 {
     ovs_mutex_lock(&ofproto_mutex);
     struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+    LIST_FOR_EACH (ofconn, connmgr_node, &mgr->conns) {
         if (ofmonitor_may_resume(ofconn)) {
             poll_immediate_wake();
         }
@@ -2290,6 +2287,12 @@  ofmonitor_wait(struct connmgr *mgr)
     ovs_mutex_unlock(&ofproto_mutex);
 }
 
+const char *
+ofconn_type_to_string(enum ofconn_type type)
+{
+    return type == OFCONN_PRIMARY ? "primary" : "service";
+}
+
 void
 ofproto_async_msg_free(struct ofproto_async_msg *am)
 {
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index 83708ee51c7a..de5793dd03e4 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -2718,7 +2718,7 @@  ofp12_controller_role_to_str(enum ofp12_controller_role role)
         return "slave";
     case OFPCR12_ROLE_NOCHANGE:
     default:
-        return "*** INVALID ROLE ***";
+        return NULL;
     }
 }