[ovs-dev,v2,2/2] ovn-controller: Add a new thread in pinctrl module to handle packet-ins.
diff mbox series

Message ID 20190312163047.17884-1-nusiddiq@redhat.com
State Superseded
Headers show
Series
  • ovn-controller: Add a new thread in pinctrl module to process packet-ins
Related show

Commit Message

Numan Siddique March 12, 2019, 4:30 p.m. UTC
From: Numan Siddique <nusiddiq@redhat.com>

Prior to this patch, ovn-controller was single threaded and everytime the
poll_block() at the end of the main while() loop wakes up, it  processes
the whole SB DB and translates the logical flows to OF flows.

There are few issues with this -

  * For every packet-in received, ovn-controller does this translation
    resulting in unnecessary CPU cycles.

  * If the translation takes a lot of time, then the packet-in handling
    would get delayed. The delay in responses to DHCP requests could
    result in resending of these requests.

This patch addresses these issues by creating a new pthread in pinctrl module
to handle packet-ins. This thread doesn't access the Southbound DB IDL object.

Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
require access to the Southbound DB contents and gARPs, periodic IPv6 RA
generation also requires the DB access, pinctrl_run() called by the main
ovn-controller thread accesses the Southbound DB IDL and builds the local
datastructures. pinctrl_handler thread accesses these data structures
in handling such requests. An ovs_mutex is used between the pinctr_run() and
the pinctrl_handler thread to protect these data structures.

Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
---
 ovn/controller/pinctrl.c | 666 +++++++++++++++++++++++++++++++--------
 1 file changed, 528 insertions(+), 138 deletions(-)

Patch
diff mbox series

diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
index dac1fec20..5c0b1802b 100644
--- a/ovn/controller/pinctrl.c
+++ b/ovn/controller/pinctrl.c
@@ -27,6 +27,7 @@ 
 #include "lport.h"
 #include "nx-match.h"
 #include "ovn-controller.h"
+#include "latch.h"
 #include "lib/packets.h"
 #include "lib/sset.h"
 #include "openvswitch/ofp-actions.h"
@@ -48,48 +49,49 @@ 
 #include "openvswitch/poll-loop.h"
 #include "openvswitch/rconn.h"
 #include "socket-util.h"
+#include "seq.h"
 #include "timeval.h"
 #include "vswitch-idl.h"
 #include "lflow.h"
 
 VLOG_DEFINE_THIS_MODULE(pinctrl);
 
-/* OpenFlow connection to the switch. */
-static struct rconn *swconn_;
-
-/* Last seen sequence number for 'swconn_'.  When this differs from
- * rconn_get_connection_seqno(rconn), 'swconn_' has reconnected. */
-static unsigned int conn_seq_no;
-
 static void init_buffered_packets_map(void);
 static void destroy_buffered_packets_map(void);
 
-static void pinctrl_handle_put_mac_binding(struct rconn *swconn,
-                                           const struct flow *md,
+static void pinctrl_handle_put_mac_binding(const struct flow *md,
                                            const struct flow *headers,
-                                           bool is_arp);
+                                           bool is_arp)
+    OVS_REQUIRES(pinctrl_mutex);
 static void init_put_mac_bindings(void);
 static void destroy_put_mac_bindings(void);
 static void run_put_mac_bindings(
     struct ovsdb_idl_txn *ovnsb_idl_txn,
     struct ovsdb_idl_index *sbrec_datapath_binding_by_key,
     struct ovsdb_idl_index *sbrec_port_binding_by_key,
-    struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
+    struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip)
+    OVS_REQUIRES(pinctrl_mutex);
 static void wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn);
 static void flush_put_mac_bindings(void);
+static void buffer_put_mac_bindings(void);
+static void destroy_buffered_mac_bindings(void);
+static void send_mac_binding_buffered_pkts(struct rconn *swconn)
+    OVS_REQUIRES(pinctrl_mutex);
 
 static void init_send_garps(void);
 static void destroy_send_garps(void);
-static void send_garp_wait(void);
-static void send_garp_run(
-    struct rconn *swconn,
+static void send_garp_wait(long long int send_garp_time);
+static void send_garp_prepare(
     struct ovsdb_idl_index *sbrec_chassis_by_name,
     struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
     struct ovsdb_idl_index *sbrec_port_binding_by_name,
     const struct ovsrec_bridge *,
     const struct sbrec_chassis *,
     const struct hmap *local_datapaths,
-    const struct sset *active_tunnels);
+    const struct sset *active_tunnels)
+    OVS_REQUIRES(pinctrl_mutex);
+static void send_garp_run(struct rconn *swconn, long long int *send_garp_time)
+    OVS_REQUIRES(pinctrl_mutex);
 static void pinctrl_handle_nd_na(struct rconn *swconn,
                                  const struct flow *ip_flow,
                                  const struct match *md,
@@ -109,26 +111,125 @@  static void pinctrl_handle_nd_ns(struct rconn *swconn,
                                  struct ofpbuf *userdata);
 static void init_ipv6_ras(void);
 static void destroy_ipv6_ras(void);
-static void ipv6_ra_wait(void);
-static void send_ipv6_ras(
-    struct rconn *swconn,
+static void ipv6_ra_wait(long long int send_ipv6_ra_time);
+static void prepare_ipv6_ras(
     struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
     struct ovsdb_idl_index *sbrec_port_binding_by_name,
-    const struct hmap *local_datapaths);
-;
+    const struct hmap *local_datapaths)
+    OVS_REQUIRES(pinctrl_mutex);
+static void send_ipv6_ras(struct rconn *swconn,
+                          long long int *send_ipv6_ra_time)
+    OVS_REQUIRES(pinctrl_mutex);
+static bool may_inject_pkts(void);
 
 COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
 COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
 
+/* pinctrl module creates a thread - pinctrl_handler to handle
+ * the packet-ins from ovs-vswitchd. Some of the OVN actions
+ * are translated to OF 'controller' actions. See include/ovn/actions.h
+ * for more details.
+ *
+ * pinctrl_handler thread doesn't access the Southbound IDL object. But
+ * some of the OVN actions which gets translated to 'controller'
+ * OF action, require data from Southbound DB.  Below are the details
+ * on how these actions are implemented.
+ *
+ * pinctrl_run() function is called by ovn-controller main thread.
+ * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler() thread
+ * and pinctrl_run().
+ *
+ *   - dns_lookup -     In order to do a DNS lookup, this action needs
+ *                      to access the 'DNS' table. pinctrl_run() builds a
+ *                      local DNS cache - 'dns_cache'. See sync_dns_cache()
+ *                      for more details.
+ *                      The function 'pinctrl_handle_dns_lookup()' (which is
+ *                      called with in the pinctrl_handler thread) looks into
+ *                      the local DNS cache to resolve the DNS requests.
+ *
+ *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC addresses
+ *                      in the 'MAC_Binding' table.
+ *                      The function 'pinctrl_handle_put_mac_binding()' (which
+ *                      is called with in the pinctrl_handler thread), stores
+ *                      the IPv4/IPv6 and MAC addresses in the
+ *                      hmap - put_mac_bindings.
+ *
+ *                      pinctrl_run(), reads these mac bindings from the hmap
+ *                      'put_mac_bindings' and writes to the 'MAC_Binding'
+ *                      table in the Southbound DB.
+ *
+ *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor solicit
+ *                      requests. The original packets are buffered and
+ *                      injected back when put_arp/put_nd actions are called.
+ *                      When pinctrl_run(), writes the mac bindings from the
+ *                      'put_mac_bindings' hmap to the MAC_Binding table in
+ *                      SB DB, it moves these mac bindings to another hmap -
+ *                      'buffered_mac_bindings'.
+ *
+ *                      The pinctrl_handler thread calls the function -
+ *                      send_mac_binding_buffered_pkts(), which uses
+ *                      the hmap - 'buffered_mac_bindings' and reinjects the
+ *                      buffered packets.
+ *
+ * pinctrl module also periodically sends IPv6 Router Solicitation requests
+ * and gARPs (for the router gateway IPs and configured NAT addresses).
+ *
+ * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
+ *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras' by
+ *                    looking into the Southbound DB table - Port_Binding.
+ *
+ *                    pinctrl_handler thread sends the periodic IPv6 RAs using
+ *                    the shash - 'ipv6_ras'
+ *
+ * gARP handling    - pinctrl_run() prepares the gARP information
+ *                    (see send_garp_prepare()) in the shash 'send_garp_data'
+ *                    by looking into the Southbound DB table Port_Binding.
+ *
+ *                    pinctrl_handler() thread sends these gARPs using the
+ *                    shash 'send_garp_data'.
+ *
+ * Notification between pinctrl_handler() and pinctrl_run()
+ * -------------------------------------------------------
+ * 'struct seq' is used for notification between pinctrl_handler() thread
+ *  and pinctrl_run().
+ *  'pinctrl_handler_seq' is used by pinctrl_run() to
+ *  wake up pinctrl_handler thread from poll_block() if any changes happened
+ *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings' structures.
+ *
+ *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
+ *  the main thread from poll_block() when mac bindings needs to be updated
+ *  in the Southboubd DB.
+ * */
+
+static struct ovs_mutex pinctrl_mutex = OVS_MUTEX_INITIALIZER;
+static struct seq *pinctrl_handler_seq;
+static struct seq *pinctrl_main_seq;
+
+static void *pinctrl_handler(void *arg);
+
+struct pinctrl {
+    char *br_int_name;
+    pthread_t pinctrl_thread;
+    /* Latch to destroy the 'pinctrl_thread' */
+    struct latch pinctrl_thread_exit;
+};
+
+static struct pinctrl pinctrl;
+
 void
 pinctrl_init(void)
 {
-    swconn_ = rconn_create(5, 0, DSCP_DEFAULT, 1 << OFP13_VERSION);
-    conn_seq_no = 0;
     init_put_mac_bindings();
     init_send_garps();
     init_ipv6_ras();
     init_buffered_packets_map();
+    pinctrl.br_int_name = NULL;
+    pinctrl_handler_seq = seq_create();
+    pinctrl_main_seq = seq_create();
+
+    latch_init(&pinctrl.pinctrl_thread_exit);
+    pinctrl.pinctrl_thread = ovs_thread_create("ovn_pinctrl", pinctrl_handler,
+                                                &pinctrl);
 }
 
 static ovs_be32
@@ -149,7 +250,7 @@  pinctrl_setup(struct rconn *swconn)
      * change the miss_send_len to UINT16_MAX, so that we can enable
      * asynchronous messages. */
     queue_msg(swconn, ofpraw_alloc(OFPRAW_OFPT_GET_CONFIG_REQUEST,
-                           rconn_get_version(swconn), 0));
+                                   rconn_get_version(swconn), 0));
 
     /* Set a packet-in format that supports userdata.  */
     queue_msg(swconn,
@@ -339,6 +440,7 @@  pinctrl_find_buffered_packets(const struct in6_addr *ip, uint32_t hash)
     return NULL;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static int
 pinctrl_handle_buffered_packets(const struct flow *ip_flow,
                                 struct dp_packet *pkt_in,
@@ -376,6 +478,7 @@  pinctrl_handle_buffered_packets(const struct flow *ip_flow,
     return 0;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_arp(struct rconn *swconn, const struct flow *ip_flow,
                    struct dp_packet *pkt_in,
@@ -418,6 +521,7 @@  pinctrl_handle_arp(struct rconn *swconn, const struct flow *ip_flow,
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_icmp(struct rconn *swconn, const struct flow *ip_flow,
                     struct dp_packet *pkt_in,
@@ -498,6 +602,7 @@  pinctrl_handle_icmp(struct rconn *swconn, const struct flow *ip_flow,
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_tcp_reset(struct rconn *swconn, const struct flow *ip_flow,
                          struct dp_packet *pkt_in,
@@ -571,6 +676,7 @@  pinctrl_handle_tcp_reset(struct rconn *swconn, const struct flow *ip_flow,
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_dhcp_opts(
     struct rconn *swconn,
@@ -936,6 +1042,7 @@  compose_out_dhcpv6_opts(struct ofpbuf *userdata,
     return true;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_dhcpv6_opts(
     struct rconn *swconn,
@@ -1135,12 +1242,87 @@  put_be32(struct ofpbuf *buf, ovs_be32 x)
     ofpbuf_put(buf, &x, sizeof x);
 }
 
+struct dns_data {
+    uint64_t *dps;
+    size_t n_dps;
+    struct smap records;
+    bool delete;
+};
+
+static struct shash dns_cache = SHASH_INITIALIZER(&dns_cache);
+
+/* Called by pinctrl_run(). Runs within the main ovn-controller
+ * thread context. */
+static void
+sync_dns_cache(const struct sbrec_dns_table *dns_table)
+    OVS_REQUIRES(pinctrl_mutex)
+{
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &dns_cache) {
+        struct dns_data *d = iter->data;
+        d->delete = true;
+    }
+
+    const struct sbrec_dns *sbrec_dns;
+    SBREC_DNS_TABLE_FOR_EACH (sbrec_dns, dns_table) {
+        const char *dns_id = smap_get(&sbrec_dns->external_ids, "dns_id");
+        if (!dns_id) {
+            continue;
+        }
+
+        struct dns_data *dns_data = shash_find_data(&dns_cache, dns_id);
+        if (!dns_data) {
+            dns_data = xmalloc(sizeof *dns_data);
+            smap_init(&dns_data->records);
+            shash_add(&dns_cache, dns_id, dns_data);
+            dns_data->n_dps = 0;
+            dns_data->dps = NULL;
+        } else {
+            free(dns_data->dps);
+        }
+
+        dns_data->delete = false;
+
+        if (!smap_equal(&dns_data->records, &sbrec_dns->records)) {
+            smap_clear(&dns_data->records);
+            smap_clone(&dns_data->records, &sbrec_dns->records);
+        }
+
+        dns_data->n_dps = sbrec_dns->n_datapaths;
+        dns_data->dps = xcalloc(dns_data->n_dps, sizeof(uint64_t));
+        for (size_t i = 0; i < sbrec_dns->n_datapaths; i++) {
+            dns_data->dps[i] = sbrec_dns->datapaths[i]->tunnel_key;
+        }
+    }
+
+    struct shash_node *next;
+    SHASH_FOR_EACH_SAFE (iter, next, &dns_cache) {
+        struct dns_data *d = iter->data;
+        if (d->delete) {
+            shash_delete(&dns_cache, iter);
+            free(d);
+        }
+    }
+}
+
+static void
+destroy_dns_cache(void)
+{
+    struct shash_node *iter, *next;
+    SHASH_FOR_EACH_SAFE (iter, next, &dns_cache) {
+        struct dns_data *d = iter->data;
+        shash_delete(&dns_cache, iter);
+        free(d);
+    }
+}
+
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_dns_lookup(
     struct rconn *swconn,
-    const struct sbrec_dns_table *dns_table,
     struct dp_packet *pkt_in, struct ofputil_packet_in *pin,
     struct ofpbuf *userdata, struct ofpbuf *continuation)
+    OVS_REQUIRES(pinctrl_mutex)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     enum ofp_version version = rconn_get_version(swconn);
@@ -1231,13 +1413,13 @@  pinctrl_handle_dns_lookup(
     }
 
     uint64_t dp_key = ntohll(pin->flow_metadata.flow.metadata);
-    const struct sbrec_dns *sbrec_dns;
     const char *answer_ips = NULL;
-    SBREC_DNS_TABLE_FOR_EACH (sbrec_dns, dns_table) {
-        for (size_t i = 0; i < sbrec_dns->n_datapaths; i++) {
-            if (sbrec_dns->datapaths[i]->tunnel_key == dp_key) {
-                answer_ips = smap_get(&sbrec_dns->records,
-                                      ds_cstr(&query_name));
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &dns_cache) {
+        struct dns_data *d = iter->data;
+        for (size_t i = 0; i < d->n_dps; i++) {
+            if (d->dps[i] == dp_key) {
+                answer_ips = smap_get(&d->records, ds_cstr(&query_name));
                 if (answer_ips) {
                     break;
                 }
@@ -1390,10 +1572,9 @@  exit:
     dp_packet_uninit(pkt_out_ptr);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-process_packet_in(struct rconn *swconn,
-                  const struct ofp_header *msg,
-                  const struct sbrec_dns_table *dns_table)
+process_packet_in(struct rconn *swconn, const struct ofp_header *msg)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -1431,8 +1612,10 @@  process_packet_in(struct rconn *swconn,
         break;
 
     case ACTION_OPCODE_PUT_ARP:
-        pinctrl_handle_put_mac_binding(swconn, &pin.flow_metadata.flow,
-                                       &headers, true);
+        ovs_mutex_lock(&pinctrl_mutex);
+        pinctrl_handle_put_mac_binding(&pin.flow_metadata.flow, &headers,
+                                       true);
+        ovs_mutex_unlock(&pinctrl_mutex);
         break;
 
     case ACTION_OPCODE_PUT_DHCP_OPTS:
@@ -1451,8 +1634,10 @@  process_packet_in(struct rconn *swconn,
         break;
 
     case ACTION_OPCODE_PUT_ND:
-        pinctrl_handle_put_mac_binding(swconn, &pin.flow_metadata.flow,
-                                       &headers, false);
+        ovs_mutex_lock(&pinctrl_mutex);
+        pinctrl_handle_put_mac_binding(&pin.flow_metadata.flow, &headers,
+                                       false);
+        ovs_mutex_unlock(&pinctrl_mutex);
         break;
 
     case ACTION_OPCODE_PUT_DHCPV6_OPTS:
@@ -1461,8 +1646,10 @@  process_packet_in(struct rconn *swconn,
         break;
 
     case ACTION_OPCODE_DNS_LOOKUP:
-        pinctrl_handle_dns_lookup(swconn, dns_table,
-                                  &packet, &pin, &userdata, &continuation);
+        ovs_mutex_lock(&pinctrl_mutex);
+        pinctrl_handle_dns_lookup(swconn, &packet, &pin, &userdata,
+                                  &continuation);
+        ovs_mutex_unlock(&pinctrl_mutex);
         break;
 
     case ACTION_OPCODE_LOG:
@@ -1496,9 +1683,10 @@  process_packet_in(struct rconn *swconn,
     }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_recv(struct rconn *swconn, const struct sbrec_dns_table *dns_table,
-             const struct ofp_header *oh, enum ofptype type)
+pinctrl_recv(struct rconn *swconn, const struct ofp_header *oh,
+             enum ofptype type)
 {
     if (type == OFPTYPE_ECHO_REQUEST) {
         queue_msg(swconn, ofputil_encode_echo_reply(oh));
@@ -1510,7 +1698,7 @@  pinctrl_recv(struct rconn *swconn, const struct sbrec_dns_table *dns_table,
         config.miss_send_len = UINT16_MAX;
         set_switch_config(swconn, &config);
     } else if (type == OFPTYPE_PACKET_IN) {
-        process_packet_in(swconn, oh, dns_table);
+        process_packet_in(swconn, oh);
     } else {
         if (VLOG_IS_DBG_ENABLED()) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
@@ -1523,6 +1711,112 @@  pinctrl_recv(struct rconn *swconn, const struct sbrec_dns_table *dns_table,
     }
 }
 
+/* Called with in the main ovn-controller thread context. */
+
+static void
+notify_pinctrl_handler(void)
+{
+    seq_change(pinctrl_handler_seq);
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+notify_pinctrl_main(void)
+{
+    seq_change(pinctrl_main_seq);
+}
+
+/* pinctrl_handler pthread function. */
+static void *
+pinctrl_handler(void *arg_)
+{
+    struct pinctrl *pctrl = arg_;
+    /* OpenFlow connection to the switch. */
+    struct rconn *swconn;
+    /* Last seen sequence number for 'swconn'.  When this differs from
+     * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
+    unsigned int conn_seq_no = 0;
+
+    char *br_int_name = NULL;
+    uint64_t new_seq;
+
+    /* Next IPV6 RA in seconds. */
+    static long long int send_ipv6_ra_time = LLONG_MAX;
+    /* Next GARP announcement in ms. */
+    static long long int send_garp_time = LLONG_MAX;
+
+    swconn = rconn_create(5, 0, DSCP_DEFAULT, 1 << OFP13_VERSION);
+
+    while (!latch_is_set(&pctrl->pinctrl_thread_exit)) {
+        if (pctrl->br_int_name) {
+            if (!br_int_name || strcmp(br_int_name, pctrl->br_int_name)) {
+                free(br_int_name);
+                br_int_name = xstrdup(pctrl->br_int_name);
+            }
+        }
+
+        if (br_int_name) {
+            char *target;
+
+            target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int_name);
+            if (strcmp(target, rconn_get_target(swconn))) {
+                VLOG_INFO("%s: connecting to switch", target);
+                rconn_connect(swconn, target, target);
+            }
+            free(target);
+        } else {
+            rconn_disconnect(swconn);
+        }
+
+        rconn_run(swconn);
+        if (rconn_is_connected(swconn)) {
+            if (conn_seq_no != rconn_get_connection_seqno(swconn)) {
+                pinctrl_setup(swconn);
+                conn_seq_no = rconn_get_connection_seqno(swconn);
+            }
+
+            for (int i = 0; i < 50; i++) {
+                struct ofpbuf *msg = rconn_recv(swconn);
+                if (!msg) {
+                    break;
+                }
+
+                const struct ofp_header *oh = msg->data;
+                enum ofptype type;
+
+                ofptype_decode(&type, oh);
+                pinctrl_recv(swconn, oh, type);
+                ofpbuf_delete(msg);
+            }
+
+            if (may_inject_pkts()) {
+                ovs_mutex_lock(&pinctrl_mutex);
+                send_garp_run(swconn, &send_garp_time);
+                send_ipv6_ras(swconn, &send_ipv6_ra_time);
+                send_mac_binding_buffered_pkts(swconn);
+                ovs_mutex_unlock(&pinctrl_mutex);
+            }
+        }
+
+        buffered_packets_map_gc();
+        rconn_run_wait(swconn);
+        rconn_recv_wait(swconn);
+        send_garp_wait(send_garp_time);
+        ipv6_ra_wait(send_ipv6_ra_time);
+
+        new_seq = seq_read(pinctrl_handler_seq);
+        seq_wait(pinctrl_handler_seq, new_seq);
+
+        latch_wait(&pctrl->pinctrl_thread_exit);
+        poll_block();
+    }
+
+    free(br_int_name);
+    rconn_destroy(swconn);
+    return NULL;
+}
+
+/* Called by ovn-controller. */
 void
 pinctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
             struct ovsdb_idl_index *sbrec_chassis_by_name,
@@ -1537,58 +1831,33 @@  pinctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
             const struct hmap *local_datapaths,
             const struct sset *active_tunnels)
 {
-    char *target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int->name);
-    if (strcmp(target, rconn_get_target(swconn_))) {
-        VLOG_INFO("%s: connecting to switch", target);
-        rconn_connect(swconn_, target, target);
-    }
-    free(target);
-
-    rconn_run(swconn_);
-
-    if (!rconn_is_connected(swconn_)) {
-        return;
-    }
-
-    if (conn_seq_no != rconn_get_connection_seqno(swconn_)) {
-        pinctrl_setup(swconn_);
-        conn_seq_no = rconn_get_connection_seqno(swconn_);
-        flush_put_mac_bindings();
-    }
-
-    /* Process a limited number of messages per call. */
-    for (int i = 0; i < 50; i++) {
-        struct ofpbuf *msg = rconn_recv(swconn_);
-        if (!msg) {
-            break;
+    ovs_mutex_lock(&pinctrl_mutex);
+    if (br_int && (!pinctrl.br_int_name || strcmp(pinctrl.br_int_name,
+                                                  br_int->name))) {
+        if (pinctrl.br_int_name) {
+            free(pinctrl.br_int_name);
         }
-
-        const struct ofp_header *oh = msg->data;
-        enum ofptype type;
-
-        ofptype_decode(&type, oh);
-        pinctrl_recv(swconn_, dns_table, oh, type);
-        ofpbuf_delete(msg);
+        pinctrl.br_int_name = xstrdup(br_int->name);
+        /* Notify pinctrl_handler that integration bridge is
+         * set/changed. */
+        notify_pinctrl_handler();
     }
-
     run_put_mac_bindings(ovnsb_idl_txn, sbrec_datapath_binding_by_key,
                          sbrec_port_binding_by_key,
                          sbrec_mac_binding_by_lport_ip);
-    send_garp_run(swconn_, sbrec_chassis_by_name,
-                  sbrec_port_binding_by_datapath,
-                  sbrec_port_binding_by_name, br_int, chassis,
-                  local_datapaths, active_tunnels);
-    send_ipv6_ras(swconn_, sbrec_port_binding_by_datapath,
-                  sbrec_port_binding_by_name, local_datapaths);
-    buffered_packets_map_gc();
+    send_garp_prepare(sbrec_chassis_by_name, sbrec_port_binding_by_datapath,
+                      sbrec_port_binding_by_name, br_int, chassis,
+                      local_datapaths, active_tunnels);
+    prepare_ipv6_ras(sbrec_port_binding_by_datapath,
+                     sbrec_port_binding_by_name, local_datapaths);
+    sync_dns_cache(dns_table);
+    ovs_mutex_unlock(&pinctrl_mutex);
 }
 
-/* Table of ipv6_ra_state structures, keyed on logical port name */
+/* Table of ipv6_ra_state structures, keyed on logical port name.
+ * Protected by pinctrl_mutex. */
 static struct shash ipv6_ras;
 
-/* Next IPV6 RA in seconds. */
-static long long int send_ipv6_ra_time;
-
 struct ipv6_ra_config {
     time_t min_interval;
     time_t max_interval;
@@ -1614,7 +1883,6 @@  static void
 init_ipv6_ras(void)
 {
     shash_init(&ipv6_ras);
-    send_ipv6_ra_time = LLONG_MAX;
 }
 
 static void
@@ -1727,6 +1995,7 @@  put_load(uint64_t value, enum mf_field_id dst, int ofs, int n_bits,
     bitwise_one(ofpact_set_field_mask(sf), sf->field->n_bytes, ofs, n_bits);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static long long int
 ipv6_ra_send(struct rconn *swconn, struct ipv6_ra_state *ra)
 {
@@ -1785,27 +2054,49 @@  ipv6_ra_send(struct rconn *swconn, struct ipv6_ra_state *ra)
     return ra->next_announce;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-ipv6_ra_wait(void)
+ipv6_ra_wait(long long int send_ipv6_ra_time)
 {
-    poll_timer_wait_until(send_ipv6_ra_time);
+    /* Set the poll timer for next IPv6 RA only if IPv6 RAs needs to
+     * be sent. */
+    if (!shash_is_empty(&ipv6_ras)) {
+        poll_timer_wait_until(send_ipv6_ra_time);
+    }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-send_ipv6_ras(struct rconn *swconn,
-              struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
-              struct ovsdb_idl_index *sbrec_port_binding_by_name,
-              const struct hmap *local_datapaths)
+send_ipv6_ras(struct rconn *swconn, long long int *send_ipv6_ra_time)
+    OVS_REQUIRES(pinctrl_mutex)
 {
-    struct shash_node *iter, *iter_next;
+    *send_ipv6_ra_time = LLONG_MAX;
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &ipv6_ras) {
+        struct ipv6_ra_state *ra = iter->data;
+        long long int next_ra = ipv6_ra_send(swconn, ra);
+        if (*send_ipv6_ra_time > next_ra) {
+            *send_ipv6_ra_time = next_ra;
+        }
+    }
+}
 
-    send_ipv6_ra_time = LLONG_MAX;
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
+static void
+prepare_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
+                 struct ovsdb_idl_index *sbrec_port_binding_by_name,
+                 const struct hmap *local_datapaths)
+    OVS_REQUIRES(pinctrl_mutex)
+{
+    struct shash_node *iter, *iter_next;
 
     SHASH_FOR_EACH (iter, &ipv6_ras) {
         struct ipv6_ra_state *ra = iter->data;
         ra->delete_me = true;
     }
 
+    bool changed = false;
     const struct local_datapath *ld;
     HMAP_FOR_EACH (ld, hmap_node, local_datapaths) {
         struct sbrec_port_binding *target = sbrec_port_binding_index_init_row(
@@ -1844,6 +2135,7 @@  send_ipv6_ras(struct rconn *swconn,
                     ra->config->min_interval,
                     ra->config->max_interval);
                 shash_add(&ipv6_ras, pb->logical_port, ra);
+                changed = true;
             } else {
                 if (config->min_interval != ra->config->min_interval ||
                     config->max_interval != ra->config->max_interval)
@@ -1862,10 +2154,7 @@  send_ipv6_ras(struct rconn *swconn,
             ra->metadata = peer->datapath->tunnel_key;
             ra->delete_me = false;
 
-            long long int next_ra = ipv6_ra_send(swconn, ra);
-            if (send_ipv6_ra_time > next_ra) {
-                send_ipv6_ra_time = next_ra;
-            }
+            /* pinctrl_handler thread will send the IPv6 RAs. */
         }
         sbrec_port_binding_index_destroy_row(target);
     }
@@ -1878,26 +2167,39 @@  send_ipv6_ras(struct rconn *swconn,
             ipv6_ra_delete(ra);
         }
     }
+
+    if (changed) {
+        notify_pinctrl_handler();
+    }
+
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 void
 pinctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
 {
     wait_put_mac_bindings(ovnsb_idl_txn);
-    rconn_run_wait(swconn_);
-    rconn_recv_wait(swconn_);
-    send_garp_wait();
-    ipv6_ra_wait();
+    int64_t new_seq = seq_read(pinctrl_main_seq);
+    seq_wait(pinctrl_main_seq, new_seq);
 }
 
+/* Called by ovn-controller. */
 void
 pinctrl_destroy(void)
 {
-    rconn_destroy(swconn_);
-    destroy_put_mac_bindings();
+    latch_set(&pinctrl.pinctrl_thread_exit);
+    pthread_join(pinctrl.pinctrl_thread, NULL);
+    latch_destroy(&pinctrl.pinctrl_thread_exit);
+    free(pinctrl.br_int_name);
     destroy_send_garps();
     destroy_ipv6_ras();
     destroy_buffered_packets_map();
+    destroy_put_mac_bindings();
+    destroy_buffered_mac_bindings();
+    destroy_dns_cache();
+    seq_destroy(pinctrl_main_seq);
+    seq_destroy(pinctrl_handler_seq);
 }
 
 /* Implementation of the "put_arp" and "put_nd" OVN actions.  These
@@ -1928,11 +2230,13 @@  struct put_mac_binding {
 
 /* Contains "struct put_mac_binding"s. */
 static struct hmap put_mac_bindings;
+static struct hmap buffered_mac_bindings;
 
 static void
 init_put_mac_bindings(void)
 {
     hmap_init(&put_mac_bindings);
+    hmap_init(&buffered_mac_bindings);
 }
 
 static void
@@ -1942,6 +2246,17 @@  destroy_put_mac_bindings(void)
     hmap_destroy(&put_mac_bindings);
 }
 
+static void
+destroy_buffered_mac_bindings(void)
+{
+    struct put_mac_binding *pmb;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &buffered_mac_bindings) {
+       free(pmb);
+    }
+
+    hmap_destroy(&buffered_mac_bindings);
+}
+
 static struct put_mac_binding *
 pinctrl_find_put_mac_binding(uint32_t dp_key, uint32_t port_key,
                              const struct in6_addr *ip_key, uint32_t hash)
@@ -1957,13 +2272,15 @@  pinctrl_find_put_mac_binding(uint32_t dp_key, uint32_t port_key,
     return NULL;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_put_mac_binding(struct rconn *swconn, const struct flow *md,
-                               const struct flow *headers, bool is_arp)
+pinctrl_handle_put_mac_binding(const struct flow *md,
+                               const struct flow *headers,
+                               bool is_arp)
+    OVS_REQUIRES(pinctrl_mutex)
 {
     uint32_t dp_key = ntohll(md->metadata);
     uint32_t port_key = md->regs[MFF_LOG_INPORT - MFF_REG0];
-    struct buffered_packets *bp;
     struct in6_addr ip_key;
 
     if (is_arp) {
@@ -1991,11 +2308,28 @@  pinctrl_handle_put_mac_binding(struct rconn *swconn, const struct flow *md,
     pmb->timestamp = time_msec();
     pmb->mac = headers->dl_src;
 
-    /* send queued pkts */
-    uint32_t bhash = hash_bytes(&ip_key, sizeof ip_key, 0);
-    bp = pinctrl_find_buffered_packets(&ip_key, bhash);
-    if (bp) {
-        buffered_send_packets(swconn, bp, &pmb->mac);
+    /* We can send the buffered packet once the main ovn-controller
+     * thread calls pinctrl_run() and it writes the mac_bindings stored
+     * in 'put_mac_bindings' hmap into the Southbound MAC_Binding table. */
+    notify_pinctrl_main();
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+send_mac_binding_buffered_pkts(struct rconn *swconn)
+    OVS_REQUIRES(pinctrl_mutex)
+{
+    struct put_mac_binding *pmb;
+    struct buffered_packets *bp;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &buffered_mac_bindings) {
+        uint32_t bhash = hash_bytes(&pmb->ip_key, sizeof pmb->ip_key, 0);
+
+        bp = pinctrl_find_buffered_packets(&pmb->ip_key, bhash);
+        if (bp) {
+            buffered_send_packets(swconn, bp, &pmb->mac);
+        }
+
+        free(pmb);
     }
 }
 
@@ -2065,11 +2399,14 @@  run_put_mac_binding(struct ovsdb_idl_txn *ovnsb_idl_txn,
     ds_destroy(&ip_s);
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 static void
 run_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn,
                      struct ovsdb_idl_index *sbrec_datapath_binding_by_key,
                      struct ovsdb_idl_index *sbrec_port_binding_by_key,
                      struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip)
+    OVS_REQUIRES(pinctrl_mutex)
 {
     if (!ovnsb_idl_txn) {
         return;
@@ -2082,7 +2419,14 @@  run_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn,
                             sbrec_mac_binding_by_lport_ip,
                             pmb);
     }
-    flush_put_mac_bindings();
+
+    /* Move the mac bindings from 'put_mac_bindings' hmap to
+     * 'buffered_mac_bindings' and notify the pinctrl_handler.
+     * pinctrl_handler will reinject the buffered packets. */
+    if (!hmap_is_empty(&put_mac_bindings)) {
+        buffer_put_mac_bindings();
+        notify_pinctrl_handler();
+    }
 }
 
 static void
@@ -2093,6 +2437,17 @@  wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn)
     }
 }
 
+static void
+buffer_put_mac_bindings(void)
+{
+    struct put_mac_binding *pmb;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &put_mac_bindings) {
+        uint32_t hash = hash_bytes(&pmb->ip_key, sizeof pmb->ip_key,
+                                   hash_2words(pmb->dp_key, pmb->port_key));
+        hmap_insert(&buffered_mac_bindings, &pmb->hmap_node, hash);
+    }
+}
+
 static void
 flush_put_mac_bindings(void)
 {
@@ -2119,17 +2474,13 @@  struct garp_data {
     uint32_t port_key;           /* Port to inject the GARP into. */
 };
 
-/* Contains GARPs to be sent. */
+/* Contains GARPs to be sent. Protected by pinctrl_mutex*/
 static struct shash send_garp_data;
 
-/* Next GARP announcement in ms. */
-static long long int send_garp_time;
-
 static void
 init_send_garps(void)
 {
     shash_init(&send_garp_data);
-    send_garp_time = LLONG_MAX;
 }
 
 static void
@@ -2138,6 +2489,7 @@  destroy_send_garps(void)
     shash_destroy_free_data(&send_garp_data);
 }
 
+/* Runs with in the main ovn-controller thread context. */
 static void
 add_garp(const char *name, const struct eth_addr ea, ovs_be32 ip,
          uint32_t dp_key, uint32_t port_key)
@@ -2150,6 +2502,10 @@  add_garp(const char *name, const struct eth_addr ea, ovs_be32 ip,
     garp->dp_key = dp_key;
     garp->port_key = port_key;
     shash_add(&send_garp_data, name, garp);
+
+    /* Notify pinctrl_handler so that it can wakeup and process
+     * these GARP requests. */
+    notify_pinctrl_handler();
 }
 
 /* Add or update a vif for which GARPs need to be announced. */
@@ -2221,11 +2577,14 @@  send_garp_delete(const char *lport)
 {
     struct garp_data *garp = shash_find_and_delete(&send_garp_data, lport);
     free(garp);
+    notify_pinctrl_handler();
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static long long int
 send_garp(struct rconn *swconn, struct garp_data *garp,
           long long int current_time)
+    OVS_REQUIRES(pinctrl_mutex)
 {
     if (current_time < garp->announce_time) {
         return garp->announce_time;
@@ -2521,20 +2880,48 @@  get_nat_addresses_and_keys(struct ovsdb_idl_index *sbrec_chassis_by_name,
 }
 
 static void
-send_garp_wait(void)
+send_garp_wait(long long int send_garp_time)
+{
+    /* Set the poll timer for next garp only if there is garp data to
+     * be sent. */
+    if (!shash_is_empty(&send_garp_data)) {
+        poll_timer_wait_until(send_garp_time);
+    }
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+send_garp_run(struct rconn *swconn, long long int *send_garp_time)
+    OVS_REQUIRES(pinctrl_mutex)
 {
-    poll_timer_wait_until(send_garp_time);
+    if (shash_is_empty(&send_garp_data)) {
+        return;
+    }
+
+    /* Send GARPs, and update the next announcement. */
+    struct shash_node *iter;
+    long long int current_time = time_msec();
+    *send_garp_time = LLONG_MAX;
+    SHASH_FOR_EACH (iter, &send_garp_data) {
+        long long int next_announce = send_garp(swconn, iter->data,
+                                                current_time);
+        if (*send_garp_time > next_announce) {
+            *send_garp_time = next_announce;
+        }
+    }
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 static void
-send_garp_run(struct rconn *swconn,
-              struct ovsdb_idl_index *sbrec_chassis_by_name,
-              struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
-              struct ovsdb_idl_index *sbrec_port_binding_by_name,
-              const struct ovsrec_bridge *br_int,
-              const struct sbrec_chassis *chassis,
-              const struct hmap *local_datapaths,
-              const struct sset *active_tunnels)
+send_garp_prepare(struct ovsdb_idl_index *sbrec_chassis_by_name,
+                  struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
+                  struct ovsdb_idl_index *sbrec_port_binding_by_name,
+                  const struct ovsrec_bridge *br_int,
+                  const struct sbrec_chassis *chassis,
+                  const struct hmap *local_datapaths,
+                  const struct sset *active_tunnels)
+    OVS_REQUIRES(pinctrl_mutex)
 {
     struct sset localnet_vifs = SSET_INITIALIZER(&localnet_vifs);
     struct sset local_l3gw_ports = SSET_INITIALIZER(&local_l3gw_ports);
@@ -2582,16 +2969,8 @@  send_garp_run(struct rconn *swconn,
         }
     }
 
-    /* Send GARPs, and update the next announcement. */
-    long long int current_time = time_msec();
-    send_garp_time = LLONG_MAX;
-    SHASH_FOR_EACH (iter, &send_garp_data) {
-        long long int next_announce = send_garp(swconn, iter->data,
-                                                current_time);
-        if (send_garp_time > next_announce) {
-            send_garp_time = next_announce;
-        }
-    }
+    /* pinctrl_handler thread will send the GARPs. */
+
     sset_destroy(&localnet_vifs);
     sset_destroy(&local_l3gw_ports);
 
@@ -2606,6 +2985,14 @@  send_garp_run(struct rconn *swconn,
     sset_destroy(&nat_ip_keys);
 }
 
+static bool
+may_inject_pkts(void)
+{
+    return (!shash_is_empty(&ipv6_ras) ||
+            !shash_is_empty(&send_garp_data) ||
+            !hmap_is_empty(&buffered_mac_bindings));
+}
+
 static void
 reload_metadata(struct ofpbuf *ofpacts, const struct match *md)
 {
@@ -2642,6 +3029,7 @@  reload_metadata(struct ofpbuf *ofpacts, const struct match *md)
     }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_nd_na(struct rconn *swconn, const struct flow *ip_flow,
                      const struct match *md,
@@ -2674,6 +3062,7 @@  pinctrl_handle_nd_na(struct rconn *swconn, const struct flow *ip_flow,
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_nd_ns(struct rconn *swconn, const struct flow *ip_flow,
                      struct dp_packet *pkt_in,
@@ -2700,6 +3089,7 @@  pinctrl_handle_nd_ns(struct rconn *swconn, const struct flow *ip_flow,
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_nd_ra_opts(
     struct rconn *swconn,