diff mbox series

[ovs-dev] ovn-controller: Add a new thread in pinctrl module to handle packet-ins.

Message ID 20190311162847.28294-1-nusiddiq@redhat.com
State Superseded
Headers show
Series [ovs-dev] ovn-controller: Add a new thread in pinctrl module to handle packet-ins. | expand

Commit Message

Numan Siddique March 11, 2019, 4:28 p.m. UTC
From: Numan Siddique <nusiddiq@redhat.com>

Prior to this patch, ovn-controller was single threaded and everytime the
poll_block() at the end of the main while() loop wakes up, it  processes
the whole SB DB and translates the logical flows to OF flows.

There are few issues with this -

  * For every packet-in received, ovn-controller does this translation
    resulting in unnecessary CPU cycles.

  * If the translation takes a lot of time, then the packet-in handling
    would get delayed. The delay in responses to DHCP requests could
    result in resending of these requests.

This patch addresses these issues by creating a new pthread in pinctrl module
to handle packet-ins. This thread doesn't access the Southbound DB IDL object.

Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
require access to the Southbound DB contents and gARPs, periodic IPv6 RA
generation also requires the DB access, pinctrl_run() called by the main
ovn-controller thread accesses the Southbound DB IDL and builds the local
datastructures. pinctrl_handler thread accesses these data structures
in handling such requests. An ovs_mutex is used between the pinctr_run() and
the pinctrl_handler thread to protect these data structures.

Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
---
 ovn/controller/pinctrl.c | 681 ++++++++++++++++++++++++++++++---------
 1 file changed, 532 insertions(+), 149 deletions(-)

Comments

Han Zhou March 11, 2019, 10:11 p.m. UTC | #1
On Mon, Mar 11, 2019 at 9:29 AM <nusiddiq@redhat.com> wrote:
>
> From: Numan Siddique <nusiddiq@redhat.com>
>
> Prior to this patch, ovn-controller was single threaded and everytime the
> poll_block() at the end of the main while() loop wakes up, it  processes
> the whole SB DB and translates the logical flows to OF flows.
>
> There are few issues with this -
>
>   * For every packet-in received, ovn-controller does this translation
>     resulting in unnecessary CPU cycles.
>
>   * If the translation takes a lot of time, then the packet-in handling
>     would get delayed. The delay in responses to DHCP requests could
>     result in resending of these requests.
>
> This patch addresses these issues by creating a new pthread in pinctrl module
> to handle packet-ins. This thread doesn't access the Southbound DB IDL object.
>
> Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
> require access to the Southbound DB contents and gARPs, periodic IPv6 RA
> generation also requires the DB access, pinctrl_run() called by the main
> ovn-controller thread accesses the Southbound DB IDL and builds the local
> datastructures. pinctrl_handler thread accesses these data structures
> in handling such requests. An ovs_mutex is used between the pinctr_run() and
> the pinctrl_handler thread to protect these data structures.
>
> Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
> ---
>  ovn/controller/pinctrl.c | 681 ++++++++++++++++++++++++++++++---------
>  1 file changed, 532 insertions(+), 149 deletions(-)
>
> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
> index 100a20ff2..857cb5d51 100644
> --- a/ovn/controller/pinctrl.c
> +++ b/ovn/controller/pinctrl.c
> @@ -27,6 +27,7 @@
>  #include "lport.h"
>  #include "nx-match.h"
>  #include "ovn-controller.h"
> +#include "latch.h"
>  #include "lib/packets.h"
>  #include "lib/sset.h"
>  #include "openvswitch/ofp-actions.h"
> @@ -48,19 +49,13 @@
>  #include "openvswitch/poll-loop.h"
>  #include "openvswitch/rconn.h"
>  #include "socket-util.h"
> +#include "seq.h"
>  #include "timeval.h"
>  #include "vswitch-idl.h"
>  #include "lflow.h"
>
>  VLOG_DEFINE_THIS_MODULE(pinctrl);
>
> -/* OpenFlow connection to the switch. */
> -static struct rconn *swconn;
> -
> -/* Last seen sequence number for 'swconn'.  When this differs from
> - * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
> -static unsigned int conn_seq_no;
> -
>  static void init_buffered_packets_map(void);
>  static void destroy_buffered_packets_map(void);
>
> @@ -76,11 +71,14 @@ static void run_put_mac_bindings(
>      struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
>  static void wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn);
>  static void flush_put_mac_bindings(void);
> +static void buffer_put_mac_bindings(void);
> +static void destroy_buffered_mac_bindings(void);
> +static void send_mac_binding_buffered_pkts(struct rconn *swconn);
>
>  static void init_send_garps(void);
>  static void destroy_send_garps(void);
>  static void send_garp_wait(void);
> -static void send_garp_run(
> +static void send_garp_prepare(
>      struct ovsdb_idl_index *sbrec_chassis_by_name,
>      struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>      struct ovsdb_idl_index *sbrec_port_binding_by_name,
> @@ -88,45 +86,144 @@ static void send_garp_run(
>      const struct sbrec_chassis *,
>      const struct hmap *local_datapaths,
>      const struct sset *active_tunnels);
> -static void pinctrl_handle_nd_na(const struct flow *ip_flow,
> +static void send_garp_run(struct rconn *swconn);
> +static void pinctrl_handle_nd_na(struct rconn *swconn,
> +                                 const struct flow *ip_flow,
>                                   const struct match *md,
>                                   struct ofpbuf *userdata,
>                                   bool is_router);
>  static void reload_metadata(struct ofpbuf *ofpacts,
>                              const struct match *md);
>  static void pinctrl_handle_put_nd_ra_opts(
> +    struct rconn *swconn,
>      const struct flow *ip_flow, struct dp_packet *pkt_in,
>      struct ofputil_packet_in *pin, struct ofpbuf *userdata,
>      struct ofpbuf *continuation);
> -static void pinctrl_handle_nd_ns(const struct flow *ip_flow,
> +static void pinctrl_handle_nd_ns(struct rconn *swconn,
> +                                 const struct flow *ip_flow,
>                                   struct dp_packet *pkt_in,
>                                   const struct match *md,
>                                   struct ofpbuf *userdata);
>  static void init_ipv6_ras(void);
>  static void destroy_ipv6_ras(void);
>  static void ipv6_ra_wait(void);
> -static void send_ipv6_ras(
> +static void prepare_ipv6_ras(
>      struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>      struct ovsdb_idl_index *sbrec_port_binding_by_name,
>      const struct hmap *local_datapaths);
> -;
> +static void send_ipv6_ras(struct rconn *swconn);
>
>  COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
>  COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
>
> +/* pinctrl module creates a thread - pinctrl_handler to handle
> + * the packet-ins from ovs-vswitchd. Some of the OVN actions
> + * are translated to OF 'controller' actions. See include/ovn/actions.h
> + * for more details.
> + *
> + * pinctrl_handler thread doesn't access the Southbound IDL object. But
> + * some of the OVN actions which gets translated to 'controller'
> + * OF action, require data from Southbound DB.  Below are the details
> + * on how these actions are implemented.
> + *
> + * pinctrl_run() function is called by ovn-controller main thread.
> + * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler() thread
> + * and pinctrl_run().
> + *
> + *   - dns_lookup -     In order to do a DNS lookup, this action needs
> + *                      to access the 'DNS' table. pinctrl_run() builds a
> + *                      local DNS cache - 'dns_cache'. See sync_dns_cache()
> + *                      for more details.
> + *                      The function 'pinctrl_handle_dns_lookup()' (which is
> + *                      called with in the pinctrl_handler thread) looks into
> + *                      the local DNS cache to resolve the DNS requests.
> + *
> + *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC addresses
> + *                      in the 'MAC_Binding' table.
> + *                      The function 'pinctrl_handle_put_mac_binding()' (which
> + *                      is called with in the pinctrl_handler thread), stores
> + *                      the IPv4/IPv6 and MAC addresses in the
> + *                      hmap - put_mac_bindings.
> + *
> + *                      pinctrl_run(), reads these mac bindings from the hmap
> + *                      'put_mac_bindings' and writes to the 'MAC_Binding'
> + *                      table in the Southbound DB.
> + *
> + *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor solicit
> + *                      requests. The original packets are buffered and
> + *                      injected back when put_arp/put_nd actions are called.
> + *                      When pinctrl_run(), writes the mac bindings from the
> + *                      'put_mac_bindings' hmap, it moves these mac bindings
> + *                      to another hmap - 'buffered_mac_bindings'.
> + *
> + *                      The pinctrl_handler thread calls the function -
> + *                      send_mac_binding_buffered_pkts(), which uses
> + *                      the hmap - 'buffered_mac_bindings' and reinjects the
> + *                      buffered packets.
> + *
> + * pinctrl module also periodically sends IPv6 Router Solicitation requests
> + * and gARPs (for the router gateway IPs and configured NAT addresses).
> + *
> + * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
> + *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras' by
> + *                    looking into the Southbound DB table - Port_Binding.
> + *
> + *                    pinctrl_handler thread sends the periodic IPv6 RAs using
> + *                    the shash - 'ipv6_ras'
> + *
> + * gARP handling    - pinctrl_run() prepares the gARP information
> + *                    (see send_garp_prepare()) in the shash 'send_garp_data'
> + *                    by looking into the Southbound DB table Port_Binding.
> + *
> + *                    pinctrl_handler() thread sends these gARPs using the
> + *                    shash 'send_garp_data'.
> + *
> + * Notification between pinctrl_handler() and pinctrl_run()
> + * -------------------------------------------------------
> + * 'struct seq' is used for notification between pinctrl_handler() thread
> + *  and pinctrl_run().
> + *  'pinctrl_handler_seq' is used by pinctrl_run() to
> + *  wake up pinctrl_handler thread from poll_block() if any changes happened
> + *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings' structures.
> + *
> + *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
> + *  the main thread from poll_block() when mac bindings needs to be updated
> + *  in the Southboubd DB.
> + * */
> +

Thanks Numan for addressing this problem. It is very good that the
implementation doesn't require a separate IDL for SB access. Here are
some generic comments.

For operations that doesn't rely on SB DB, such as OVS probe echo, ACL
logging and DHCP, in theory, they can be handled completely in
parallel. However, in this patch the pinctrl_mutex locks the whole
pinctrl_handler. I think the lock can be relaxed to exclude the
operations that doesn't need SB DB access.

For tasks that need READONLY access to SB DB, the main thread reads
from DB, and notifies pinctrl thread to process the changed data. This
synchronization is definitely required and the implementation looks
good to me.

For tasks that requires WRITE access to SB DB, such as put_arp, it
will still trigger recompute in main thread with this patch, and the
two thread would still block each other. This is because when waking
up main thread with pinctrl_main_seq, the main thread will recompute
flows again, even if there is no change in SB DB. In fact this can be
solved by the first patches of the incremental processing
(https://github.com/hzhou8/ovs/commit/2bc0e2423fdcff21db4d607f3164b0e0c62cd660).
In that patch there is no real incremental processing yet, but since
it is abstracted with dependency model, it can avoid flow computing
when it is not needed. Do you think it is good to be combined with
this multi-threading patch to avoid the problem of put_arp triggering
flow computing which blocks put_arp operation? Sorry for bringing the
I-P series back if it is annoying. I think the major reason that was
rejected earlier was the concern of maintenance effort for dependency
graph and change-handler implementations. However, since the first
patches of I-P only build the coarse grained dependency without any
change-handler, we can just utilize the benefit without worrying about
maintenance, although I might be biased. (p.s. I am still eager to
work on DDlog for ovn-controller, but I guess it will just take quite
some time to be ready).

Thanks,
Han
Mark Michelson March 12, 2019, 12:46 p.m. UTC | #2
On 3/11/19 6:11 PM, Han Zhou wrote:
> On Mon, Mar 11, 2019 at 9:29 AM <nusiddiq@redhat.com> wrote:
>>
>> From: Numan Siddique <nusiddiq@redhat.com>
>>
>> Prior to this patch, ovn-controller was single threaded and everytime the
>> poll_block() at the end of the main while() loop wakes up, it  processes
>> the whole SB DB and translates the logical flows to OF flows.
>>
>> There are few issues with this -
>>
>>    * For every packet-in received, ovn-controller does this translation
>>      resulting in unnecessary CPU cycles.
>>
>>    * If the translation takes a lot of time, then the packet-in handling
>>      would get delayed. The delay in responses to DHCP requests could
>>      result in resending of these requests.
>>
>> This patch addresses these issues by creating a new pthread in pinctrl module
>> to handle packet-ins. This thread doesn't access the Southbound DB IDL object.
>>
>> Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
>> require access to the Southbound DB contents and gARPs, periodic IPv6 RA
>> generation also requires the DB access, pinctrl_run() called by the main
>> ovn-controller thread accesses the Southbound DB IDL and builds the local
>> datastructures. pinctrl_handler thread accesses these data structures
>> in handling such requests. An ovs_mutex is used between the pinctr_run() and
>> the pinctrl_handler thread to protect these data structures.
>>
>> Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
>> ---
>>   ovn/controller/pinctrl.c | 681 ++++++++++++++++++++++++++++++---------
>>   1 file changed, 532 insertions(+), 149 deletions(-)
>>
>> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
>> index 100a20ff2..857cb5d51 100644
>> --- a/ovn/controller/pinctrl.c
>> +++ b/ovn/controller/pinctrl.c
>> @@ -27,6 +27,7 @@
>>   #include "lport.h"
>>   #include "nx-match.h"
>>   #include "ovn-controller.h"
>> +#include "latch.h"
>>   #include "lib/packets.h"
>>   #include "lib/sset.h"
>>   #include "openvswitch/ofp-actions.h"
>> @@ -48,19 +49,13 @@
>>   #include "openvswitch/poll-loop.h"
>>   #include "openvswitch/rconn.h"
>>   #include "socket-util.h"
>> +#include "seq.h"
>>   #include "timeval.h"
>>   #include "vswitch-idl.h"
>>   #include "lflow.h"
>>
>>   VLOG_DEFINE_THIS_MODULE(pinctrl);
>>
>> -/* OpenFlow connection to the switch. */
>> -static struct rconn *swconn;
>> -
>> -/* Last seen sequence number for 'swconn'.  When this differs from
>> - * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
>> -static unsigned int conn_seq_no;
>> -
>>   static void init_buffered_packets_map(void);
>>   static void destroy_buffered_packets_map(void);
>>
>> @@ -76,11 +71,14 @@ static void run_put_mac_bindings(
>>       struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
>>   static void wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn);
>>   static void flush_put_mac_bindings(void);
>> +static void buffer_put_mac_bindings(void);
>> +static void destroy_buffered_mac_bindings(void);
>> +static void send_mac_binding_buffered_pkts(struct rconn *swconn);
>>
>>   static void init_send_garps(void);
>>   static void destroy_send_garps(void);
>>   static void send_garp_wait(void);
>> -static void send_garp_run(
>> +static void send_garp_prepare(
>>       struct ovsdb_idl_index *sbrec_chassis_by_name,
>>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
>> @@ -88,45 +86,144 @@ static void send_garp_run(
>>       const struct sbrec_chassis *,
>>       const struct hmap *local_datapaths,
>>       const struct sset *active_tunnels);
>> -static void pinctrl_handle_nd_na(const struct flow *ip_flow,
>> +static void send_garp_run(struct rconn *swconn);
>> +static void pinctrl_handle_nd_na(struct rconn *swconn,
>> +                                 const struct flow *ip_flow,
>>                                    const struct match *md,
>>                                    struct ofpbuf *userdata,
>>                                    bool is_router);
>>   static void reload_metadata(struct ofpbuf *ofpacts,
>>                               const struct match *md);
>>   static void pinctrl_handle_put_nd_ra_opts(
>> +    struct rconn *swconn,
>>       const struct flow *ip_flow, struct dp_packet *pkt_in,
>>       struct ofputil_packet_in *pin, struct ofpbuf *userdata,
>>       struct ofpbuf *continuation);
>> -static void pinctrl_handle_nd_ns(const struct flow *ip_flow,
>> +static void pinctrl_handle_nd_ns(struct rconn *swconn,
>> +                                 const struct flow *ip_flow,
>>                                    struct dp_packet *pkt_in,
>>                                    const struct match *md,
>>                                    struct ofpbuf *userdata);
>>   static void init_ipv6_ras(void);
>>   static void destroy_ipv6_ras(void);
>>   static void ipv6_ra_wait(void);
>> -static void send_ipv6_ras(
>> +static void prepare_ipv6_ras(
>>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
>>       const struct hmap *local_datapaths);
>> -;
>> +static void send_ipv6_ras(struct rconn *swconn);
>>
>>   COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
>>   COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
>>
>> +/* pinctrl module creates a thread - pinctrl_handler to handle
>> + * the packet-ins from ovs-vswitchd. Some of the OVN actions
>> + * are translated to OF 'controller' actions. See include/ovn/actions.h
>> + * for more details.
>> + *
>> + * pinctrl_handler thread doesn't access the Southbound IDL object. But
>> + * some of the OVN actions which gets translated to 'controller'
>> + * OF action, require data from Southbound DB.  Below are the details
>> + * on how these actions are implemented.
>> + *
>> + * pinctrl_run() function is called by ovn-controller main thread.
>> + * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler() thread
>> + * and pinctrl_run().
>> + *
>> + *   - dns_lookup -     In order to do a DNS lookup, this action needs
>> + *                      to access the 'DNS' table. pinctrl_run() builds a
>> + *                      local DNS cache - 'dns_cache'. See sync_dns_cache()
>> + *                      for more details.
>> + *                      The function 'pinctrl_handle_dns_lookup()' (which is
>> + *                      called with in the pinctrl_handler thread) looks into
>> + *                      the local DNS cache to resolve the DNS requests.
>> + *
>> + *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC addresses
>> + *                      in the 'MAC_Binding' table.
>> + *                      The function 'pinctrl_handle_put_mac_binding()' (which
>> + *                      is called with in the pinctrl_handler thread), stores
>> + *                      the IPv4/IPv6 and MAC addresses in the
>> + *                      hmap - put_mac_bindings.
>> + *
>> + *                      pinctrl_run(), reads these mac bindings from the hmap
>> + *                      'put_mac_bindings' and writes to the 'MAC_Binding'
>> + *                      table in the Southbound DB.
>> + *
>> + *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor solicit
>> + *                      requests. The original packets are buffered and
>> + *                      injected back when put_arp/put_nd actions are called.
>> + *                      When pinctrl_run(), writes the mac bindings from the
>> + *                      'put_mac_bindings' hmap, it moves these mac bindings
>> + *                      to another hmap - 'buffered_mac_bindings'.
>> + *
>> + *                      The pinctrl_handler thread calls the function -
>> + *                      send_mac_binding_buffered_pkts(), which uses
>> + *                      the hmap - 'buffered_mac_bindings' and reinjects the
>> + *                      buffered packets.
>> + *
>> + * pinctrl module also periodically sends IPv6 Router Solicitation requests
>> + * and gARPs (for the router gateway IPs and configured NAT addresses).
>> + *
>> + * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
>> + *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras' by
>> + *                    looking into the Southbound DB table - Port_Binding.
>> + *
>> + *                    pinctrl_handler thread sends the periodic IPv6 RAs using
>> + *                    the shash - 'ipv6_ras'
>> + *
>> + * gARP handling    - pinctrl_run() prepares the gARP information
>> + *                    (see send_garp_prepare()) in the shash 'send_garp_data'
>> + *                    by looking into the Southbound DB table Port_Binding.
>> + *
>> + *                    pinctrl_handler() thread sends these gARPs using the
>> + *                    shash 'send_garp_data'.
>> + *
>> + * Notification between pinctrl_handler() and pinctrl_run()
>> + * -------------------------------------------------------
>> + * 'struct seq' is used for notification between pinctrl_handler() thread
>> + *  and pinctrl_run().
>> + *  'pinctrl_handler_seq' is used by pinctrl_run() to
>> + *  wake up pinctrl_handler thread from poll_block() if any changes happened
>> + *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings' structures.
>> + *
>> + *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
>> + *  the main thread from poll_block() when mac bindings needs to be updated
>> + *  in the Southboubd DB.
>> + * */
>> +
> 
> Thanks Numan for addressing this problem. It is very good that the
> implementation doesn't require a separate IDL for SB access. Here are
> some generic comments.
> 
> For operations that doesn't rely on SB DB, such as OVS probe echo, ACL
> logging and DHCP, in theory, they can be handled completely in
> parallel. However, in this patch the pinctrl_mutex locks the whole
> pinctrl_handler. I think the lock can be relaxed to exclude the
> operations that doesn't need SB DB access.
> 
> For tasks that need READONLY access to SB DB, the main thread reads
> from DB, and notifies pinctrl thread to process the changed data. This
> synchronization is definitely required and the implementation looks
> good to me.
> 
> For tasks that requires WRITE access to SB DB, such as put_arp, it
> will still trigger recompute in main thread with this patch, and the
> two thread would still block each other. This is because when waking
> up main thread with pinctrl_main_seq, the main thread will recompute
> flows again, even if there is no change in SB DB. In fact this can be
> solved by the first patches of the incremental processing
> (https://github.com/hzhou8/ovs/commit/2bc0e2423fdcff21db4d607f3164b0e0c62cd660).
> In that patch there is no real incremental processing yet, but since
> it is abstracted with dependency model, it can avoid flow computing
> when it is not needed. Do you think it is good to be combined with
> this multi-threading patch to avoid the problem of put_arp triggering
> flow computing which blocks put_arp operation? 

I think the idea of not recomputing flows if SB DB has not changed is a 
fantastic idea and should be implemented whether pinctrl is made 
multithreaded or not. If the goal is *only* not to recompute flows if SB 
DB has not changed, then it potentially could be done with a simpler set 
of patches, since there is no goal of implementing an I-P engine.

I was going to suggest that it may be possible to avoid such a patch by 
having the pinctrl_handle_mac_binding() function detect if local data 
has changed before notifying the main thread. However, I don't think 
this will work if MAC bindings move between hypervisors.

Sorry for bringing the
> I-P series back if it is annoying. I think the major reason that was
> rejected earlier was the concern of maintenance effort for dependency
> graph and change-handler implementations. However, since the first
> patches of I-P only build the coarse grained dependency without any
> change-handler, we can just utilize the benefit without worrying about
> maintenance, although I might be biased. (p.s. I am still eager to
> work on DDlog for ovn-controller, but I guess it will just take quite
> some time to be ready).
> 
> Thanks,
> Han
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Numan Siddique March 12, 2019, 4:22 p.m. UTC | #3
On Tue, Mar 12, 2019 at 6:16 PM Mark Michelson <mmichels@redhat.com> wrote:

> On 3/11/19 6:11 PM, Han Zhou wrote:
> > On Mon, Mar 11, 2019 at 9:29 AM <nusiddiq@redhat.com> wrote:
> >>
> >> From: Numan Siddique <nusiddiq@redhat.com>
> >>
> >> Prior to this patch, ovn-controller was single threaded and everytime
> the
> >> poll_block() at the end of the main while() loop wakes up, it  processes
> >> the whole SB DB and translates the logical flows to OF flows.
> >>
> >> There are few issues with this -
> >>
> >>    * For every packet-in received, ovn-controller does this translation
> >>      resulting in unnecessary CPU cycles.
> >>
> >>    * If the translation takes a lot of time, then the packet-in handling
> >>      would get delayed. The delay in responses to DHCP requests could
> >>      result in resending of these requests.
> >>
> >> This patch addresses these issues by creating a new pthread in pinctrl
> module
> >> to handle packet-ins. This thread doesn't access the Southbound DB IDL
> object.
> >>
> >> Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
> >> require access to the Southbound DB contents and gARPs, periodic IPv6 RA
> >> generation also requires the DB access, pinctrl_run() called by the main
> >> ovn-controller thread accesses the Southbound DB IDL and builds the
> local
> >> datastructures. pinctrl_handler thread accesses these data structures
> >> in handling such requests. An ovs_mutex is used between the
> pinctr_run() and
> >> the pinctrl_handler thread to protect these data structures.
> >>
> >> Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
> >> ---
> >>   ovn/controller/pinctrl.c | 681 ++++++++++++++++++++++++++++++---------
> >>   1 file changed, 532 insertions(+), 149 deletions(-)
> >>
> >> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
> >> index 100a20ff2..857cb5d51 100644
> >> --- a/ovn/controller/pinctrl.c
> >> +++ b/ovn/controller/pinctrl.c
> >> @@ -27,6 +27,7 @@
> >>   #include "lport.h"
> >>   #include "nx-match.h"
> >>   #include "ovn-controller.h"
> >> +#include "latch.h"
> >>   #include "lib/packets.h"
> >>   #include "lib/sset.h"
> >>   #include "openvswitch/ofp-actions.h"
> >> @@ -48,19 +49,13 @@
> >>   #include "openvswitch/poll-loop.h"
> >>   #include "openvswitch/rconn.h"
> >>   #include "socket-util.h"
> >> +#include "seq.h"
> >>   #include "timeval.h"
> >>   #include "vswitch-idl.h"
> >>   #include "lflow.h"
> >>
> >>   VLOG_DEFINE_THIS_MODULE(pinctrl);
> >>
> >> -/* OpenFlow connection to the switch. */
> >> -static struct rconn *swconn;
> >> -
> >> -/* Last seen sequence number for 'swconn'.  When this differs from
> >> - * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
> >> -static unsigned int conn_seq_no;
> >> -
> >>   static void init_buffered_packets_map(void);
> >>   static void destroy_buffered_packets_map(void);
> >>
> >> @@ -76,11 +71,14 @@ static void run_put_mac_bindings(
> >>       struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
> >>   static void wait_put_mac_bindings(struct ovsdb_idl_txn
> *ovnsb_idl_txn);
> >>   static void flush_put_mac_bindings(void);
> >> +static void buffer_put_mac_bindings(void);
> >> +static void destroy_buffered_mac_bindings(void);
> >> +static void send_mac_binding_buffered_pkts(struct rconn *swconn);
> >>
> >>   static void init_send_garps(void);
> >>   static void destroy_send_garps(void);
> >>   static void send_garp_wait(void);
> >> -static void send_garp_run(
> >> +static void send_garp_prepare(
> >>       struct ovsdb_idl_index *sbrec_chassis_by_name,
> >>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
> >>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
> >> @@ -88,45 +86,144 @@ static void send_garp_run(
> >>       const struct sbrec_chassis *,
> >>       const struct hmap *local_datapaths,
> >>       const struct sset *active_tunnels);
> >> -static void pinctrl_handle_nd_na(const struct flow *ip_flow,
> >> +static void send_garp_run(struct rconn *swconn);
> >> +static void pinctrl_handle_nd_na(struct rconn *swconn,
> >> +                                 const struct flow *ip_flow,
> >>                                    const struct match *md,
> >>                                    struct ofpbuf *userdata,
> >>                                    bool is_router);
> >>   static void reload_metadata(struct ofpbuf *ofpacts,
> >>                               const struct match *md);
> >>   static void pinctrl_handle_put_nd_ra_opts(
> >> +    struct rconn *swconn,
> >>       const struct flow *ip_flow, struct dp_packet *pkt_in,
> >>       struct ofputil_packet_in *pin, struct ofpbuf *userdata,
> >>       struct ofpbuf *continuation);
> >> -static void pinctrl_handle_nd_ns(const struct flow *ip_flow,
> >> +static void pinctrl_handle_nd_ns(struct rconn *swconn,
> >> +                                 const struct flow *ip_flow,
> >>                                    struct dp_packet *pkt_in,
> >>                                    const struct match *md,
> >>                                    struct ofpbuf *userdata);
> >>   static void init_ipv6_ras(void);
> >>   static void destroy_ipv6_ras(void);
> >>   static void ipv6_ra_wait(void);
> >> -static void send_ipv6_ras(
> >> +static void prepare_ipv6_ras(
> >>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
> >>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
> >>       const struct hmap *local_datapaths);
> >> -;
> >> +static void send_ipv6_ras(struct rconn *swconn);
> >>
> >>   COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
> >>   COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
> >>
> >> +/* pinctrl module creates a thread - pinctrl_handler to handle
> >> + * the packet-ins from ovs-vswitchd. Some of the OVN actions
> >> + * are translated to OF 'controller' actions. See include/ovn/actions.h
> >> + * for more details.
> >> + *
> >> + * pinctrl_handler thread doesn't access the Southbound IDL object. But
> >> + * some of the OVN actions which gets translated to 'controller'
> >> + * OF action, require data from Southbound DB.  Below are the details
> >> + * on how these actions are implemented.
> >> + *
> >> + * pinctrl_run() function is called by ovn-controller main thread.
> >> + * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler()
> thread
> >> + * and pinctrl_run().
> >> + *
> >> + *   - dns_lookup -     In order to do a DNS lookup, this action needs
> >> + *                      to access the 'DNS' table. pinctrl_run()
> builds a
> >> + *                      local DNS cache - 'dns_cache'. See
> sync_dns_cache()
> >> + *                      for more details.
> >> + *                      The function 'pinctrl_handle_dns_lookup()'
> (which is
> >> + *                      called with in the pinctrl_handler thread)
> looks into
> >> + *                      the local DNS cache to resolve the DNS
> requests.
> >> + *
> >> + *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC
> addresses
> >> + *                      in the 'MAC_Binding' table.
> >> + *                      The function
> 'pinctrl_handle_put_mac_binding()' (which
> >> + *                      is called with in the pinctrl_handler thread),
> stores
> >> + *                      the IPv4/IPv6 and MAC addresses in the
> >> + *                      hmap - put_mac_bindings.
> >> + *
> >> + *                      pinctrl_run(), reads these mac bindings from
> the hmap
> >> + *                      'put_mac_bindings' and writes to the
> 'MAC_Binding'
> >> + *                      table in the Southbound DB.
> >> + *
> >> + *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor
> solicit
> >> + *                      requests. The original packets are buffered and
> >> + *                      injected back when put_arp/put_nd actions are
> called.
> >> + *                      When pinctrl_run(), writes the mac bindings
> from the
> >> + *                      'put_mac_bindings' hmap, it moves these mac
> bindings
> >> + *                      to another hmap - 'buffered_mac_bindings'.
> >> + *
> >> + *                      The pinctrl_handler thread calls the function -
> >> + *                      send_mac_binding_buffered_pkts(), which uses
> >> + *                      the hmap - 'buffered_mac_bindings' and
> reinjects the
> >> + *                      buffered packets.
> >> + *
> >> + * pinctrl module also periodically sends IPv6 Router Solicitation
> requests
> >> + * and gARPs (for the router gateway IPs and configured NAT addresses).
> >> + *
> >> + * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
> >> + *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras'
> by
> >> + *                    looking into the Southbound DB table -
> Port_Binding.
> >> + *
> >> + *                    pinctrl_handler thread sends the periodic IPv6
> RAs using
> >> + *                    the shash - 'ipv6_ras'
> >> + *
> >> + * gARP handling    - pinctrl_run() prepares the gARP information
> >> + *                    (see send_garp_prepare()) in the shash
> 'send_garp_data'
> >> + *                    by looking into the Southbound DB table
> Port_Binding.
> >> + *
> >> + *                    pinctrl_handler() thread sends these gARPs using
> the
> >> + *                    shash 'send_garp_data'.
> >> + *
> >> + * Notification between pinctrl_handler() and pinctrl_run()
> >> + * -------------------------------------------------------
> >> + * 'struct seq' is used for notification between pinctrl_handler()
> thread
> >> + *  and pinctrl_run().
> >> + *  'pinctrl_handler_seq' is used by pinctrl_run() to
> >> + *  wake up pinctrl_handler thread from poll_block() if any changes
> happened
> >> + *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings'
> structures.
> >> + *
> >> + *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
> >> + *  the main thread from poll_block() when mac bindings needs to be
> updated
> >> + *  in the Southboubd DB.
> >> + * */
> >> +
> >
> > Thanks Numan for addressing this problem. It is very good that the
> > implementation doesn't require a separate IDL for SB access. Here are
> > some generic comments.
> >
> > For operations that doesn't rely on SB DB, such as OVS probe echo, ACL
> > logging and DHCP, in theory, they can be handled completely in
> > parallel. However, in this patch the pinctrl_mutex locks the whole
> > pinctrl_handler. I think the lock can be relaxed to exclude the
> > operations that doesn't need SB DB access.
>

I did think of it earlier, but then went with this approach to take a safe
approach.
I will address this in v2  as per your suggestion.


> >
> > For tasks that need READONLY access to SB DB, the main thread reads
> > from DB, and notifies pinctrl thread to process the changed data. This
> > synchronization is definitely required and the implementation looks
> > good to me.
> >
> > For tasks that requires WRITE access to SB DB, such as put_arp, it
> > will still trigger recompute in main thread with this patch, and the
> > two thread would still block each other.

> up main thread with pinctrl_main_seq, the main thread will recompute
> > flows again, even if there is no change in SB DB. In fact this can be
> > solved by the first patches of the incremental processing
> > (
> https://github.com/hzhou8/ovs/commit/2bc0e2423fdcff21db4d607f3164b0e0c62cd660
> ).
> > In that patch there is no real incremental processing yet, but since
> > it is abstracted with dependency model, it can avoid flow computing
> > when it is not needed. Do you think it is good to be combined with
> > this multi-threading patch to avoid the problem of put_arp triggering
> > flow computing which blocks put_arp operation?
>
> I think the idea of not recomputing flows if SB DB has not changed is a
> fantastic idea and should be implemented whether pinctrl is made
> multithreaded or not. If the goal is *only* not to recompute flows if SB
> DB has not changed, then it potentially could be done with a simpler set
> of patches, since there is no goal of implementing an I-P engine.
>
>
Thanks Han and Mark for the comments.
I agree with Mark.  I think we should address that and it would definitely
help in reducing ovn-controller's  CPU usage. But I think that can be
addressed as a separate patch since this patch is not changing that
behavior. I will look into your patch you pointed out and see if I can test
it out.

Does this sound good ?

Thanks
Numan



> I was going to suggest that it may be possible to avoid such a patch by
> having the pinctrl_handle_mac_binding() function detect if local data
> has changed before notifying the main thread. However, I don't think
> this will work if MAC bindings move between hypervisors.
>
> Sorry for bringing the
> > I-P series back if it is annoying. I think the major reason that was
> > rejected earlier was the concern of maintenance effort for dependency
> > graph and change-handler implementations. However, since the first
> > patches of I-P only build the coarse grained dependency without any
> > change-handler, we can just utilize the benefit without worrying about
> > maintenance, although I might be biased. (p.s. I am still eager to
> > work on DDlog for ovn-controller, but I guess it will just take quite
> > some time to be ready).
> >
> > Thanks,
> > Han
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> >
>
>
Han Zhou March 12, 2019, 5:49 p.m. UTC | #4
On Tue, Mar 12, 2019 at 9:22 AM Numan Siddique <nusiddiq@redhat.com> wrote:
>
>
>
> On Tue, Mar 12, 2019 at 6:16 PM Mark Michelson <mmichels@redhat.com> wrote:
>>
>> On 3/11/19 6:11 PM, Han Zhou wrote:
>> > On Mon, Mar 11, 2019 at 9:29 AM <nusiddiq@redhat.com> wrote:
>> >>
>> >> From: Numan Siddique <nusiddiq@redhat.com>
>> >>
>> >> Prior to this patch, ovn-controller was single threaded and everytime the
>> >> poll_block() at the end of the main while() loop wakes up, it  processes
>> >> the whole SB DB and translates the logical flows to OF flows.
>> >>
>> >> There are few issues with this -
>> >>
>> >>    * For every packet-in received, ovn-controller does this translation
>> >>      resulting in unnecessary CPU cycles.
>> >>
>> >>    * If the translation takes a lot of time, then the packet-in handling
>> >>      would get delayed. The delay in responses to DHCP requests could
>> >>      result in resending of these requests.
>> >>
>> >> This patch addresses these issues by creating a new pthread in pinctrl module
>> >> to handle packet-ins. This thread doesn't access the Southbound DB IDL object.
>> >>
>> >> Since some of the OVN actions - like dns_lookup, arp, put_arp, put_nd
>> >> require access to the Southbound DB contents and gARPs, periodic IPv6 RA
>> >> generation also requires the DB access, pinctrl_run() called by the main
>> >> ovn-controller thread accesses the Southbound DB IDL and builds the local
>> >> datastructures. pinctrl_handler thread accesses these data structures
>> >> in handling such requests. An ovs_mutex is used between the pinctr_run() and
>> >> the pinctrl_handler thread to protect these data structures.
>> >>
>> >> Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
>> >> ---
>> >>   ovn/controller/pinctrl.c | 681 ++++++++++++++++++++++++++++++---------
>> >>   1 file changed, 532 insertions(+), 149 deletions(-)
>> >>
>> >> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
>> >> index 100a20ff2..857cb5d51 100644
>> >> --- a/ovn/controller/pinctrl.c
>> >> +++ b/ovn/controller/pinctrl.c
>> >> @@ -27,6 +27,7 @@
>> >>   #include "lport.h"
>> >>   #include "nx-match.h"
>> >>   #include "ovn-controller.h"
>> >> +#include "latch.h"
>> >>   #include "lib/packets.h"
>> >>   #include "lib/sset.h"
>> >>   #include "openvswitch/ofp-actions.h"
>> >> @@ -48,19 +49,13 @@
>> >>   #include "openvswitch/poll-loop.h"
>> >>   #include "openvswitch/rconn.h"
>> >>   #include "socket-util.h"
>> >> +#include "seq.h"
>> >>   #include "timeval.h"
>> >>   #include "vswitch-idl.h"
>> >>   #include "lflow.h"
>> >>
>> >>   VLOG_DEFINE_THIS_MODULE(pinctrl);
>> >>
>> >> -/* OpenFlow connection to the switch. */
>> >> -static struct rconn *swconn;
>> >> -
>> >> -/* Last seen sequence number for 'swconn'.  When this differs from
>> >> - * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
>> >> -static unsigned int conn_seq_no;
>> >> -
>> >>   static void init_buffered_packets_map(void);
>> >>   static void destroy_buffered_packets_map(void);
>> >>
>> >> @@ -76,11 +71,14 @@ static void run_put_mac_bindings(
>> >>       struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
>> >>   static void wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn);
>> >>   static void flush_put_mac_bindings(void);
>> >> +static void buffer_put_mac_bindings(void);
>> >> +static void destroy_buffered_mac_bindings(void);
>> >> +static void send_mac_binding_buffered_pkts(struct rconn *swconn);
>> >>
>> >>   static void init_send_garps(void);
>> >>   static void destroy_send_garps(void);
>> >>   static void send_garp_wait(void);
>> >> -static void send_garp_run(
>> >> +static void send_garp_prepare(
>> >>       struct ovsdb_idl_index *sbrec_chassis_by_name,
>> >>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>> >>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
>> >> @@ -88,45 +86,144 @@ static void send_garp_run(
>> >>       const struct sbrec_chassis *,
>> >>       const struct hmap *local_datapaths,
>> >>       const struct sset *active_tunnels);
>> >> -static void pinctrl_handle_nd_na(const struct flow *ip_flow,
>> >> +static void send_garp_run(struct rconn *swconn);
>> >> +static void pinctrl_handle_nd_na(struct rconn *swconn,
>> >> +                                 const struct flow *ip_flow,
>> >>                                    const struct match *md,
>> >>                                    struct ofpbuf *userdata,
>> >>                                    bool is_router);
>> >>   static void reload_metadata(struct ofpbuf *ofpacts,
>> >>                               const struct match *md);
>> >>   static void pinctrl_handle_put_nd_ra_opts(
>> >> +    struct rconn *swconn,
>> >>       const struct flow *ip_flow, struct dp_packet *pkt_in,
>> >>       struct ofputil_packet_in *pin, struct ofpbuf *userdata,
>> >>       struct ofpbuf *continuation);
>> >> -static void pinctrl_handle_nd_ns(const struct flow *ip_flow,
>> >> +static void pinctrl_handle_nd_ns(struct rconn *swconn,
>> >> +                                 const struct flow *ip_flow,
>> >>                                    struct dp_packet *pkt_in,
>> >>                                    const struct match *md,
>> >>                                    struct ofpbuf *userdata);
>> >>   static void init_ipv6_ras(void);
>> >>   static void destroy_ipv6_ras(void);
>> >>   static void ipv6_ra_wait(void);
>> >> -static void send_ipv6_ras(
>> >> +static void prepare_ipv6_ras(
>> >>       struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
>> >>       struct ovsdb_idl_index *sbrec_port_binding_by_name,
>> >>       const struct hmap *local_datapaths);
>> >> -;
>> >> +static void send_ipv6_ras(struct rconn *swconn);
>> >>
>> >>   COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
>> >>   COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
>> >>
>> >> +/* pinctrl module creates a thread - pinctrl_handler to handle
>> >> + * the packet-ins from ovs-vswitchd. Some of the OVN actions
>> >> + * are translated to OF 'controller' actions. See include/ovn/actions.h
>> >> + * for more details.
>> >> + *
>> >> + * pinctrl_handler thread doesn't access the Southbound IDL object. But
>> >> + * some of the OVN actions which gets translated to 'controller'
>> >> + * OF action, require data from Southbound DB.  Below are the details
>> >> + * on how these actions are implemented.
>> >> + *
>> >> + * pinctrl_run() function is called by ovn-controller main thread.
>> >> + * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler() thread
>> >> + * and pinctrl_run().
>> >> + *
>> >> + *   - dns_lookup -     In order to do a DNS lookup, this action needs
>> >> + *                      to access the 'DNS' table. pinctrl_run() builds a
>> >> + *                      local DNS cache - 'dns_cache'. See sync_dns_cache()
>> >> + *                      for more details.
>> >> + *                      The function 'pinctrl_handle_dns_lookup()' (which is
>> >> + *                      called with in the pinctrl_handler thread) looks into
>> >> + *                      the local DNS cache to resolve the DNS requests.
>> >> + *
>> >> + *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC addresses
>> >> + *                      in the 'MAC_Binding' table.
>> >> + *                      The function 'pinctrl_handle_put_mac_binding()' (which
>> >> + *                      is called with in the pinctrl_handler thread), stores
>> >> + *                      the IPv4/IPv6 and MAC addresses in the
>> >> + *                      hmap - put_mac_bindings.
>> >> + *
>> >> + *                      pinctrl_run(), reads these mac bindings from the hmap
>> >> + *                      'put_mac_bindings' and writes to the 'MAC_Binding'
>> >> + *                      table in the Southbound DB.
>> >> + *
>> >> + *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor solicit
>> >> + *                      requests. The original packets are buffered and
>> >> + *                      injected back when put_arp/put_nd actions are called.
>> >> + *                      When pinctrl_run(), writes the mac bindings from the
>> >> + *                      'put_mac_bindings' hmap, it moves these mac bindings
>> >> + *                      to another hmap - 'buffered_mac_bindings'.
>> >> + *
>> >> + *                      The pinctrl_handler thread calls the function -
>> >> + *                      send_mac_binding_buffered_pkts(), which uses
>> >> + *                      the hmap - 'buffered_mac_bindings' and reinjects the
>> >> + *                      buffered packets.
>> >> + *
>> >> + * pinctrl module also periodically sends IPv6 Router Solicitation requests
>> >> + * and gARPs (for the router gateway IPs and configured NAT addresses).
>> >> + *
>> >> + * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
>> >> + *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras' by
>> >> + *                    looking into the Southbound DB table - Port_Binding.
>> >> + *
>> >> + *                    pinctrl_handler thread sends the periodic IPv6 RAs using
>> >> + *                    the shash - 'ipv6_ras'
>> >> + *
>> >> + * gARP handling    - pinctrl_run() prepares the gARP information
>> >> + *                    (see send_garp_prepare()) in the shash 'send_garp_data'
>> >> + *                    by looking into the Southbound DB table Port_Binding.
>> >> + *
>> >> + *                    pinctrl_handler() thread sends these gARPs using the
>> >> + *                    shash 'send_garp_data'.
>> >> + *
>> >> + * Notification between pinctrl_handler() and pinctrl_run()
>> >> + * -------------------------------------------------------
>> >> + * 'struct seq' is used for notification between pinctrl_handler() thread
>> >> + *  and pinctrl_run().
>> >> + *  'pinctrl_handler_seq' is used by pinctrl_run() to
>> >> + *  wake up pinctrl_handler thread from poll_block() if any changes happened
>> >> + *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings' structures.
>> >> + *
>> >> + *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
>> >> + *  the main thread from poll_block() when mac bindings needs to be updated
>> >> + *  in the Southboubd DB.
>> >> + * */
>> >> +
>> >
>> > Thanks Numan for addressing this problem. It is very good that the
>> > implementation doesn't require a separate IDL for SB access. Here are
>> > some generic comments.
>> >
>> > For operations that doesn't rely on SB DB, such as OVS probe echo, ACL
>> > logging and DHCP, in theory, they can be handled completely in
>> > parallel. However, in this patch the pinctrl_mutex locks the whole
>> > pinctrl_handler. I think the lock can be relaxed to exclude the
>> > operations that doesn't need SB DB access.
>
>
> I did think of it earlier, but then went with this approach to take a safe approach.
> I will address this in v2  as per your suggestion.
>
>>
>> >
>> > For tasks that need READONLY access to SB DB, the main thread reads
>> > from DB, and notifies pinctrl thread to process the changed data. This
>> > synchronization is definitely required and the implementation looks
>> > good to me.
>> >
>> > For tasks that requires WRITE access to SB DB, such as put_arp, it
>> > will still trigger recompute in main thread with this patch, and the
>> > two thread would still block each other.
>>
>> > up main thread with pinctrl_main_seq, the main thread will recompute
>> > flows again, even if there is no change in SB DB. In fact this can be
>> > solved by the first patches of the incremental processing
>> > (https://github.com/hzhou8/ovs/commit/2bc0e2423fdcff21db4d607f3164b0e0c62cd660).
>> > In that patch there is no real incremental processing yet, but since
>> > it is abstracted with dependency model, it can avoid flow computing
>> > when it is not needed. Do you think it is good to be combined with
>> > this multi-threading patch to avoid the problem of put_arp triggering
>> > flow computing which blocks put_arp operation?
>>
>> I think the idea of not recomputing flows if SB DB has not changed is a
>> fantastic idea and should be implemented whether pinctrl is made
>> multithreaded or not. If the goal is *only* not to recompute flows if SB
>> DB has not changed, then it potentially could be done with a simpler set
>> of patches, since there is no goal of implementing an I-P engine.
>>

It is not only about SB DB change, but about all input changes that
can trigger flow computation. The first patches of I-P engine just
accomplish that, but it is true that it can be simpler, at least the
flow table persistent is not necessary for this purpose. I will try to
remove the unnecessary part from that patch.

>
> Thanks Han and Mark for the comments.
> I agree with Mark.  I think we should address that and it would definitely
> help in reducing ovn-controller's  CPU usage. But I think that can be
> addressed as a separate patch since this patch is not changing that
> behavior. I will look into your patch you pointed out and see if I can test
> it out.
>
> Does this sound good ?

Agree. I can send a separate one.


>
> Thanks
> Numan
>
>
>>
>> I was going to suggest that it may be possible to avoid such a patch by
>> having the pinctrl_handle_mac_binding() function detect if local data
>> has changed before notifying the main thread. However, I don't think
>> this will work if MAC bindings move between hypervisors.
>>
>> Sorry for bringing the
>> > I-P series back if it is annoying. I think the major reason that was
>> > rejected earlier was the concern of maintenance effort for dependency
>> > graph and change-handler implementations. However, since the first
>> > patches of I-P only build the coarse grained dependency without any
>> > change-handler, we can just utilize the benefit without worrying about
>> > maintenance, although I might be biased. (p.s. I am still eager to
>> > work on DDlog for ovn-controller, but I guess it will just take quite
>> > some time to be ready).
>> >
>> > Thanks,
>> > Han
>> > _______________________________________________
>> > dev mailing list
>> > dev@openvswitch.org
>> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>> >
>>
diff mbox series

Patch

diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
index 100a20ff2..857cb5d51 100644
--- a/ovn/controller/pinctrl.c
+++ b/ovn/controller/pinctrl.c
@@ -27,6 +27,7 @@ 
 #include "lport.h"
 #include "nx-match.h"
 #include "ovn-controller.h"
+#include "latch.h"
 #include "lib/packets.h"
 #include "lib/sset.h"
 #include "openvswitch/ofp-actions.h"
@@ -48,19 +49,13 @@ 
 #include "openvswitch/poll-loop.h"
 #include "openvswitch/rconn.h"
 #include "socket-util.h"
+#include "seq.h"
 #include "timeval.h"
 #include "vswitch-idl.h"
 #include "lflow.h"
 
 VLOG_DEFINE_THIS_MODULE(pinctrl);
 
-/* OpenFlow connection to the switch. */
-static struct rconn *swconn;
-
-/* Last seen sequence number for 'swconn'.  When this differs from
- * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
-static unsigned int conn_seq_no;
-
 static void init_buffered_packets_map(void);
 static void destroy_buffered_packets_map(void);
 
@@ -76,11 +71,14 @@  static void run_put_mac_bindings(
     struct ovsdb_idl_index *sbrec_mac_binding_by_lport_ip);
 static void wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn);
 static void flush_put_mac_bindings(void);
+static void buffer_put_mac_bindings(void);
+static void destroy_buffered_mac_bindings(void);
+static void send_mac_binding_buffered_pkts(struct rconn *swconn);
 
 static void init_send_garps(void);
 static void destroy_send_garps(void);
 static void send_garp_wait(void);
-static void send_garp_run(
+static void send_garp_prepare(
     struct ovsdb_idl_index *sbrec_chassis_by_name,
     struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
     struct ovsdb_idl_index *sbrec_port_binding_by_name,
@@ -88,45 +86,144 @@  static void send_garp_run(
     const struct sbrec_chassis *,
     const struct hmap *local_datapaths,
     const struct sset *active_tunnels);
-static void pinctrl_handle_nd_na(const struct flow *ip_flow,
+static void send_garp_run(struct rconn *swconn);
+static void pinctrl_handle_nd_na(struct rconn *swconn,
+                                 const struct flow *ip_flow,
                                  const struct match *md,
                                  struct ofpbuf *userdata,
                                  bool is_router);
 static void reload_metadata(struct ofpbuf *ofpacts,
                             const struct match *md);
 static void pinctrl_handle_put_nd_ra_opts(
+    struct rconn *swconn,
     const struct flow *ip_flow, struct dp_packet *pkt_in,
     struct ofputil_packet_in *pin, struct ofpbuf *userdata,
     struct ofpbuf *continuation);
-static void pinctrl_handle_nd_ns(const struct flow *ip_flow,
+static void pinctrl_handle_nd_ns(struct rconn *swconn,
+                                 const struct flow *ip_flow,
                                  struct dp_packet *pkt_in,
                                  const struct match *md,
                                  struct ofpbuf *userdata);
 static void init_ipv6_ras(void);
 static void destroy_ipv6_ras(void);
 static void ipv6_ra_wait(void);
-static void send_ipv6_ras(
+static void prepare_ipv6_ras(
     struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
     struct ovsdb_idl_index *sbrec_port_binding_by_name,
     const struct hmap *local_datapaths);
-;
+static void send_ipv6_ras(struct rconn *swconn);
 
 COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
 COVERAGE_DEFINE(pinctrl_drop_buffered_packets_map);
 
+/* pinctrl module creates a thread - pinctrl_handler to handle
+ * the packet-ins from ovs-vswitchd. Some of the OVN actions
+ * are translated to OF 'controller' actions. See include/ovn/actions.h
+ * for more details.
+ *
+ * pinctrl_handler thread doesn't access the Southbound IDL object. But
+ * some of the OVN actions which gets translated to 'controller'
+ * OF action, require data from Southbound DB.  Below are the details
+ * on how these actions are implemented.
+ *
+ * pinctrl_run() function is called by ovn-controller main thread.
+ * A Mutex - 'pinctrl_mutex' is used between the pinctrl_handler() thread
+ * and pinctrl_run().
+ *
+ *   - dns_lookup -     In order to do a DNS lookup, this action needs
+ *                      to access the 'DNS' table. pinctrl_run() builds a
+ *                      local DNS cache - 'dns_cache'. See sync_dns_cache()
+ *                      for more details.
+ *                      The function 'pinctrl_handle_dns_lookup()' (which is
+ *                      called with in the pinctrl_handler thread) looks into
+ *                      the local DNS cache to resolve the DNS requests.
+ *
+ *   - put_arp/put_nd - These actions stores the IPv4/IPv6 and MAC addresses
+ *                      in the 'MAC_Binding' table.
+ *                      The function 'pinctrl_handle_put_mac_binding()' (which
+ *                      is called with in the pinctrl_handler thread), stores
+ *                      the IPv4/IPv6 and MAC addresses in the
+ *                      hmap - put_mac_bindings.
+ *
+ *                      pinctrl_run(), reads these mac bindings from the hmap
+ *                      'put_mac_bindings' and writes to the 'MAC_Binding'
+ *                      table in the Southbound DB.
+ *
+ *   - arp/nd_ns      - These actions generate an ARP/IPv6 Neighbor solicit
+ *                      requests. The original packets are buffered and
+ *                      injected back when put_arp/put_nd actions are called.
+ *                      When pinctrl_run(), writes the mac bindings from the
+ *                      'put_mac_bindings' hmap, it moves these mac bindings
+ *                      to another hmap - 'buffered_mac_bindings'.
+ *
+ *                      The pinctrl_handler thread calls the function -
+ *                      send_mac_binding_buffered_pkts(), which uses
+ *                      the hmap - 'buffered_mac_bindings' and reinjects the
+ *                      buffered packets.
+ *
+ * pinctrl module also periodically sends IPv6 Router Solicitation requests
+ * and gARPs (for the router gateway IPs and configured NAT addresses).
+ *
+ * IPv6 RA handling - pinctrl_run() prepares the IPv6 RA information
+ *                    (see prepare_ipv6_ras()) in the shash 'ipv6_ras' by
+ *                    looking into the Southbound DB table - Port_Binding.
+ *
+ *                    pinctrl_handler thread sends the periodic IPv6 RAs using
+ *                    the shash - 'ipv6_ras'
+ *
+ * gARP handling    - pinctrl_run() prepares the gARP information
+ *                    (see send_garp_prepare()) in the shash 'send_garp_data'
+ *                    by looking into the Southbound DB table Port_Binding.
+ *
+ *                    pinctrl_handler() thread sends these gARPs using the
+ *                    shash 'send_garp_data'.
+ *
+ * Notification between pinctrl_handler() and pinctrl_run()
+ * -------------------------------------------------------
+ * 'struct seq' is used for notification between pinctrl_handler() thread
+ *  and pinctrl_run().
+ *  'pinctrl_handler_seq' is used by pinctrl_run() to
+ *  wake up pinctrl_handler thread from poll_block() if any changes happened
+ *  in 'send_garp_data', 'ipv6_ras' and 'buffered_mac_bindings' structures.
+ *
+ *  'pinctrl_main_seq' is used by pinctrl_handler() thread to wake up
+ *  the main thread from poll_block() when mac bindings needs to be updated
+ *  in the Southboubd DB.
+ * */
+
+static struct ovs_mutex pinctrl_mutex = OVS_MUTEX_INITIALIZER;
+static struct seq *pinctrl_handler_seq;
+static struct seq *pinctrl_main_seq;
+
+static void *pinctrl_handler(void *arg);
+
+struct pinctrl {
+    char *br_int_name;
+    pthread_t pinctrl_thread;
+    /* Latch to destroy the 'pinctrl_thread' */
+    struct latch pinctrl_thread_exit;
+};
+
+static struct pinctrl pinctrl;
+
 void
 pinctrl_init(void)
 {
-    swconn = rconn_create(5, 0, DSCP_DEFAULT, 1 << OFP13_VERSION);
-    conn_seq_no = 0;
     init_put_mac_bindings();
     init_send_garps();
     init_ipv6_ras();
     init_buffered_packets_map();
+    pinctrl.br_int_name = NULL;
+    pinctrl_handler_seq = seq_create();
+    pinctrl_main_seq = seq_create();
+
+    latch_init(&pinctrl.pinctrl_thread_exit);
+    pinctrl.pinctrl_thread = ovs_thread_create("ovn_pinctrl", pinctrl_handler,
+                                                &pinctrl);
 }
 
 static ovs_be32
-queue_msg(struct ofpbuf *msg)
+queue_msg(struct rconn *swconn, struct ofpbuf *msg)
 {
     const struct ofp_header *oh = msg->data;
     ovs_be32 xid = oh->xid;
@@ -137,16 +234,17 @@  queue_msg(struct ofpbuf *msg)
 
 /* Sets up global 'swconn', a newly (re)connected connection to a switch. */
 static void
-pinctrl_setup(void)
+pinctrl_setup(struct rconn *swconn)
 {
     /* Fetch the switch configuration.  The response later will allow us to
      * change the miss_send_len to UINT16_MAX, so that we can enable
      * asynchronous messages. */
-    queue_msg(ofpraw_alloc(OFPRAW_OFPT_GET_CONFIG_REQUEST,
-                           rconn_get_version(swconn), 0));
+    queue_msg(swconn, ofpraw_alloc(OFPRAW_OFPT_GET_CONFIG_REQUEST,
+                                   rconn_get_version(swconn), 0));
 
     /* Set a packet-in format that supports userdata.  */
-    queue_msg(ofputil_encode_set_packet_in_format(rconn_get_version(swconn),
+    queue_msg(swconn,
+              ofputil_encode_set_packet_in_format(rconn_get_version(swconn),
                                                   OFPUTIL_PACKET_IN_NXT2));
 }
 
@@ -156,13 +254,14 @@  set_switch_config(struct rconn *swconn_,
 {
     enum ofp_version version = rconn_get_version(swconn_);
     struct ofpbuf *request = ofputil_encode_set_config(config, version);
-    queue_msg(request);
+    queue_msg(swconn_, request);
 }
 
 static void
-set_actions_and_enqueue_msg(const struct dp_packet *packet,
-                           const struct match *md,
-                           struct ofpbuf *userdata)
+set_actions_and_enqueue_msg(struct rconn *swconn,
+                            const struct dp_packet *packet,
+                            const struct match *md,
+                            struct ofpbuf *userdata)
 {
     /* Copy metadata from 'md' into the packet-out via "set_field"
      * actions, then add actions from 'userdata'.
@@ -192,7 +291,7 @@  set_actions_and_enqueue_msg(const struct dp_packet *packet,
     };
     match_set_in_port(&po.flow_metadata, OFPP_CONTROLLER);
     enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
-    queue_msg(ofputil_encode_packet_out(&po, proto));
+    queue_msg(swconn, ofputil_encode_packet_out(&po, proto));
     ofpbuf_uninit(&ofpacts);
 }
 
@@ -275,7 +374,8 @@  buffered_push_packet(struct buffered_packets *bp,
 }
 
 static void
-buffered_send_packets(struct buffered_packets *bp, struct eth_addr *addr)
+buffered_send_packets(struct rconn *swconn,
+                      struct buffered_packets *bp, struct eth_addr *addr)
 {
     enum ofp_version version = rconn_get_version(swconn);
     enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
@@ -293,7 +393,7 @@  buffered_send_packets(struct buffered_packets *bp, struct eth_addr *addr)
             .ofpacts_len = bi->ofpacts.size,
         };
         match_set_in_port(&po.flow_metadata, OFPP_CONTROLLER);
-        queue_msg(ofputil_encode_packet_out(&po, proto));
+        queue_msg(swconn, ofputil_encode_packet_out(&po, proto));
 
         ofpbuf_uninit(&bi->ofpacts);
         dp_packet_delete(bi->p);
@@ -330,6 +430,7 @@  pinctrl_find_buffered_packets(const struct in6_addr *ip, uint32_t hash)
     return NULL;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static int
 pinctrl_handle_buffered_packets(const struct flow *ip_flow,
                                 struct dp_packet *pkt_in,
@@ -367,8 +468,10 @@  pinctrl_handle_buffered_packets(const struct flow *ip_flow,
     return 0;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_arp(const struct flow *ip_flow, struct dp_packet *pkt_in,
+pinctrl_handle_arp(struct rconn *swconn, const struct flow *ip_flow,
+                   struct dp_packet *pkt_in,
                    const struct match *md, struct ofpbuf *userdata)
 {
     /* This action only works for IP packets, and the switch should only send
@@ -404,12 +507,14 @@  pinctrl_handle_arp(const struct flow *ip_flow, struct dp_packet *pkt_in,
                       ip_flow->vlans[0].tci);
     }
 
-    set_actions_and_enqueue_msg(&packet, md, userdata);
+    set_actions_and_enqueue_msg(swconn, &packet, md, userdata);
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_icmp(const struct flow *ip_flow, struct dp_packet *pkt_in,
+pinctrl_handle_icmp(struct rconn *swconn, const struct flow *ip_flow,
+                    struct dp_packet *pkt_in,
                     const struct match *md, struct ofpbuf *userdata)
 {
     /* This action only works for IP packets, and the switch should only send
@@ -483,12 +588,14 @@  pinctrl_handle_icmp(const struct flow *ip_flow, struct dp_packet *pkt_in,
                       ip_flow->vlans[0].tci);
     }
 
-    set_actions_and_enqueue_msg(&packet, md, userdata);
+    set_actions_and_enqueue_msg(swconn, &packet, md, userdata);
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_tcp_reset(const struct flow *ip_flow, struct dp_packet *pkt_in,
+pinctrl_handle_tcp_reset(struct rconn *swconn,
+                         const struct flow *ip_flow, struct dp_packet *pkt_in,
                          const struct match *md, struct ofpbuf *userdata)
 {
     /* This action only works for TCP segments, and the switch should only send
@@ -555,12 +662,14 @@  pinctrl_handle_tcp_reset(const struct flow *ip_flow, struct dp_packet *pkt_in,
                       ip_flow->vlans[0].tci);
     }
 
-    set_actions_and_enqueue_msg(&packet, md, userdata);
+    set_actions_and_enqueue_msg(swconn, &packet, md, userdata);
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_dhcp_opts(
+    struct rconn *swconn,
     struct dp_packet *pkt_in, struct ofputil_packet_in *pin,
     struct ofpbuf *userdata, struct ofpbuf *continuation)
 {
@@ -806,7 +915,7 @@  exit:
         sv.u8_val = success;
         mf_write_subfield(&dst, &sv, &pin->flow_metadata);
     }
-    queue_msg(ofputil_encode_resume(pin, continuation, proto));
+    queue_msg(swconn, ofputil_encode_resume(pin, continuation, proto));
     if (pkt_out_ptr) {
         dp_packet_uninit(pkt_out_ptr);
     }
@@ -923,8 +1032,10 @@  compose_out_dhcpv6_opts(struct ofpbuf *userdata,
     return true;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_dhcpv6_opts(
+    struct rconn *swconn,
     struct dp_packet *pkt_in, struct ofputil_packet_in *pin,
     struct ofpbuf *userdata, struct ofpbuf *continuation OVS_UNUSED)
 {
@@ -1105,7 +1216,7 @@  exit:
         sv.u8_val = success;
         mf_write_subfield(&dst, &sv, &pin->flow_metadata);
     }
-    queue_msg(ofputil_encode_resume(pin, continuation, proto));
+    queue_msg(swconn, ofputil_encode_resume(pin, continuation, proto));
     dp_packet_uninit(pkt_out_ptr);
 }
 
@@ -1121,9 +1232,83 @@  put_be32(struct ofpbuf *buf, ovs_be32 x)
     ofpbuf_put(buf, &x, sizeof x);
 }
 
+struct dns_data {
+    uint64_t *dps;
+    size_t n_dps;
+    struct smap records;
+    bool delete;
+};
+
+static struct shash dns_cache = SHASH_INITIALIZER(&dns_cache);
+
+/* Called by pinctrl_run(). Runs within the main ovn-controller
+ * thread context. */
+static void
+sync_dns_cache(const struct sbrec_dns_table *dns_table)
+{
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &dns_cache) {
+        struct dns_data *d = iter->data;
+        d->delete = true;
+    }
+
+    const struct sbrec_dns *sbrec_dns;
+    SBREC_DNS_TABLE_FOR_EACH (sbrec_dns, dns_table) {
+        const char *dns_id = smap_get(&sbrec_dns->external_ids, "dns_id");
+        if (!dns_id) {
+            continue;
+        }
+
+        struct dns_data *dns_data = shash_find_data(&dns_cache, dns_id);
+        if (!dns_data) {
+            dns_data = xmalloc(sizeof *dns_data);
+            smap_init(&dns_data->records);
+            shash_add(&dns_cache, dns_id, dns_data);
+            dns_data->n_dps = 0;
+            dns_data->dps = NULL;
+        } else {
+            free(dns_data->dps);
+        }
+
+        dns_data->delete = false;
+
+        if (!smap_equal(&dns_data->records, &sbrec_dns->records)) {
+            smap_clear(&dns_data->records);
+            smap_clone(&dns_data->records, &sbrec_dns->records);
+        }
+
+        dns_data->n_dps = sbrec_dns->n_datapaths;
+        dns_data->dps = xcalloc(dns_data->n_dps, sizeof(uint64_t));
+        for (size_t i = 0; i < sbrec_dns->n_datapaths; i++) {
+            dns_data->dps[i] = sbrec_dns->datapaths[i]->tunnel_key;
+        }
+    }
+
+    struct shash_node *next;
+    SHASH_FOR_EACH_SAFE (iter, next, &dns_cache) {
+        struct dns_data *d = iter->data;
+        if (d->delete) {
+            shash_delete(&dns_cache, iter);
+            free(d);
+        }
+    }
+}
+
+static void
+destroy_dns_cache(void)
+{
+    struct shash_node *iter, *next;
+    SHASH_FOR_EACH_SAFE (iter, next, &dns_cache) {
+        struct dns_data *d = iter->data;
+        shash_delete(&dns_cache, iter);
+        free(d);
+    }
+}
+
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_dns_lookup(
-    const struct sbrec_dns_table *dns_table,
+    struct rconn *swconn,
     struct dp_packet *pkt_in, struct ofputil_packet_in *pin,
     struct ofpbuf *userdata, struct ofpbuf *continuation)
 {
@@ -1216,13 +1401,13 @@  pinctrl_handle_dns_lookup(
     }
 
     uint64_t dp_key = ntohll(pin->flow_metadata.flow.metadata);
-    const struct sbrec_dns *sbrec_dns;
     const char *answer_ips = NULL;
-    SBREC_DNS_TABLE_FOR_EACH (sbrec_dns, dns_table) {
-        for (size_t i = 0; i < sbrec_dns->n_datapaths; i++) {
-            if (sbrec_dns->datapaths[i]->tunnel_key == dp_key) {
-                answer_ips = smap_get(&sbrec_dns->records,
-                                      ds_cstr(&query_name));
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &dns_cache) {
+        struct dns_data *d = iter->data;
+        for (size_t i = 0; i < d->n_dps; i++) {
+            if (d->dps[i] == dp_key) {
+                answer_ips = smap_get(&d->records, ds_cstr(&query_name));
                 if (answer_ips) {
                     break;
                 }
@@ -1371,13 +1556,13 @@  exit:
         sv.u8_val = success;
         mf_write_subfield(&dst, &sv, &pin->flow_metadata);
     }
-    queue_msg(ofputil_encode_resume(pin, continuation, proto));
+    queue_msg(swconn, ofputil_encode_resume(pin, continuation, proto));
     dp_packet_uninit(pkt_out_ptr);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-process_packet_in(const struct ofp_header *msg,
-                  const struct sbrec_dns_table *dns_table)
+process_packet_in(struct rconn *swconn, const struct ofp_header *msg)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -1410,7 +1595,8 @@  process_packet_in(const struct ofp_header *msg,
 
     switch (ntohl(ah->opcode)) {
     case ACTION_OPCODE_ARP:
-        pinctrl_handle_arp(&headers, &packet, &pin.flow_metadata, &userdata);
+        pinctrl_handle_arp(swconn, &headers, &packet, &pin.flow_metadata,
+                           &userdata);
         break;
 
     case ACTION_OPCODE_PUT_ARP:
@@ -1419,15 +1605,18 @@  process_packet_in(const struct ofp_header *msg,
         break;
 
     case ACTION_OPCODE_PUT_DHCP_OPTS:
-        pinctrl_handle_put_dhcp_opts(&packet, &pin, &userdata, &continuation);
+        pinctrl_handle_put_dhcp_opts(swconn, &packet, &pin, &userdata,
+                                     &continuation);
         break;
 
     case ACTION_OPCODE_ND_NA:
-        pinctrl_handle_nd_na(&headers, &pin.flow_metadata, &userdata, false);
+        pinctrl_handle_nd_na(swconn, &headers, &pin.flow_metadata, &userdata,
+                             false);
         break;
 
     case ACTION_OPCODE_ND_NA_ROUTER:
-        pinctrl_handle_nd_na(&headers, &pin.flow_metadata, &userdata, true);
+        pinctrl_handle_nd_na(swconn, &headers, &pin.flow_metadata, &userdata,
+                             true);
         break;
 
     case ACTION_OPCODE_PUT_ND:
@@ -1436,13 +1625,13 @@  process_packet_in(const struct ofp_header *msg,
         break;
 
     case ACTION_OPCODE_PUT_DHCPV6_OPTS:
-        pinctrl_handle_put_dhcpv6_opts(&packet, &pin, &userdata,
+        pinctrl_handle_put_dhcpv6_opts(swconn, &packet, &pin, &userdata,
                                        &continuation);
         break;
 
     case ACTION_OPCODE_DNS_LOOKUP:
-        pinctrl_handle_dns_lookup(dns_table,
-                                  &packet, &pin, &userdata, &continuation);
+        pinctrl_handle_dns_lookup(swconn, &packet, &pin, &userdata,
+                                  &continuation);
         break;
 
     case ACTION_OPCODE_LOG:
@@ -1450,22 +1639,22 @@  process_packet_in(const struct ofp_header *msg,
         break;
 
     case ACTION_OPCODE_PUT_ND_RA_OPTS:
-        pinctrl_handle_put_nd_ra_opts(&headers, &packet, &pin, &userdata,
-                                      &continuation);
+        pinctrl_handle_put_nd_ra_opts(swconn, &headers, &packet, &pin,
+                                      &userdata, &continuation);
         break;
 
     case ACTION_OPCODE_ND_NS:
-        pinctrl_handle_nd_ns(&headers, &packet, &pin.flow_metadata,
+        pinctrl_handle_nd_ns(swconn, &headers, &packet, &pin.flow_metadata,
                              &userdata);
         break;
 
     case ACTION_OPCODE_ICMP:
-        pinctrl_handle_icmp(&headers, &packet, &pin.flow_metadata,
+        pinctrl_handle_icmp(swconn, &headers, &packet, &pin.flow_metadata,
                             &userdata);
         break;
 
     case ACTION_OPCODE_TCP_RESET:
-        pinctrl_handle_tcp_reset(&headers, &packet, &pin.flow_metadata,
+        pinctrl_handle_tcp_reset(swconn, &headers, &packet, &pin.flow_metadata,
                                  &userdata);
         break;
 
@@ -1476,12 +1665,13 @@  process_packet_in(const struct ofp_header *msg,
     }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_recv(const struct sbrec_dns_table *dns_table,
-             const struct ofp_header *oh, enum ofptype type)
+pinctrl_recv(struct rconn *swconn, const struct ofp_header *oh,
+             enum ofptype type)
 {
     if (type == OFPTYPE_ECHO_REQUEST) {
-        queue_msg(ofputil_encode_echo_reply(oh));
+        queue_msg(swconn, ofputil_encode_echo_reply(oh));
     } else if (type == OFPTYPE_GET_CONFIG_REPLY) {
         /* Enable asynchronous messages */
         struct ofputil_switch_config config;
@@ -1490,7 +1680,7 @@  pinctrl_recv(const struct sbrec_dns_table *dns_table,
         config.miss_send_len = UINT16_MAX;
         set_switch_config(swconn, &config);
     } else if (type == OFPTYPE_PACKET_IN) {
-        process_packet_in(oh, dns_table);
+        process_packet_in(swconn, oh);
     } else {
         if (VLOG_IS_DBG_ENABLED()) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
@@ -1503,6 +1693,107 @@  pinctrl_recv(const struct sbrec_dns_table *dns_table,
     }
 }
 
+/* Called with in the main ovn-controller thread context. */
+
+static void
+notify_pinctrl_handler(void)
+{
+    seq_change(pinctrl_handler_seq);
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+notify_pinctrl_main(void)
+{
+    seq_change(pinctrl_main_seq);
+}
+
+/* pinctrl_handler pthread function. */
+static void *
+pinctrl_handler(void *arg_)
+{
+    struct pinctrl *pctrl = arg_;
+    /* OpenFlow connection to the switch. */
+    struct rconn *swconn;
+    /* Last seen sequence number for 'swconn'.  When this differs from
+     * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
+    unsigned int conn_seq_no;
+
+    swconn = rconn_create(5, 0, DSCP_DEFAULT, 1 << OFP13_VERSION);
+    conn_seq_no = 0;
+    char *br_int_name = NULL;
+    uint64_t new_seq;
+    while (!latch_is_set(&pctrl->pinctrl_thread_exit)) {
+        ovs_mutex_lock(&pinctrl_mutex);
+        if (pctrl->br_int_name) {
+            if (!br_int_name || strcmp(br_int_name, pctrl->br_int_name)) {
+                free(br_int_name);
+                br_int_name = xstrdup(pctrl->br_int_name);
+            }
+        }
+
+        if (br_int_name) {
+            char *target;
+
+            target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int_name);
+            if (strcmp(target, rconn_get_target(swconn))) {
+                VLOG_INFO("%s: connecting to switch", target);
+                rconn_connect(swconn, target, target);
+            }
+            free(target);
+        } else {
+            rconn_disconnect(swconn);
+        }
+
+        rconn_run(swconn);
+        if (rconn_is_connected(swconn)) {
+            if (conn_seq_no != rconn_get_connection_seqno(swconn)) {
+                pinctrl_setup(swconn);
+                conn_seq_no = rconn_get_connection_seqno(swconn);
+                flush_put_mac_bindings();
+            }
+
+            /* Process a limited number of messages per call. */
+            for (int i = 0; i < 50; i++) {
+                struct ofpbuf *msg = rconn_recv(swconn);
+                if (!msg) {
+                    break;
+                }
+
+                const struct ofp_header *oh = msg->data;
+                enum ofptype type;
+
+                ofptype_decode(&type, oh);
+                pinctrl_recv(swconn, oh, type);
+                ofpbuf_delete(msg);
+            }
+
+            send_garp_run(swconn);
+            send_ipv6_ras(swconn);
+            send_mac_binding_buffered_pkts(swconn);
+            buffered_packets_map_gc();
+        }
+
+        ovs_mutex_unlock(&pinctrl_mutex);
+
+        rconn_run_wait(swconn);
+        rconn_recv_wait(swconn);
+        send_garp_wait();
+        ipv6_ra_wait();
+
+        new_seq = seq_read(pinctrl_handler_seq);
+        seq_wait(pinctrl_handler_seq, new_seq);
+
+        latch_wait(&pctrl->pinctrl_thread_exit);
+        poll_block();
+    }
+
+    free(br_int_name);
+    rconn_destroy(swconn);
+    return NULL;
+}
+
+/* Called by ovn-controller. */
 void
 pinctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
             struct ovsdb_idl_index *sbrec_chassis_by_name,
@@ -1517,52 +1808,31 @@  pinctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
             const struct hmap *local_datapaths,
             const struct sset *active_tunnels)
 {
-    char *target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int->name);
-    if (strcmp(target, rconn_get_target(swconn))) {
-        VLOG_INFO("%s: connecting to switch", target);
-        rconn_connect(swconn, target, target);
-    }
-    free(target);
-
-    rconn_run(swconn);
-
-    if (!rconn_is_connected(swconn)) {
-        return;
-    }
-
-    if (conn_seq_no != rconn_get_connection_seqno(swconn)) {
-        pinctrl_setup();
-        conn_seq_no = rconn_get_connection_seqno(swconn);
-        flush_put_mac_bindings();
-    }
-
-    /* Process a limited number of messages per call. */
-    for (int i = 0; i < 50; i++) {
-        struct ofpbuf *msg = rconn_recv(swconn);
-        if (!msg) {
-            break;
+    ovs_mutex_lock(&pinctrl_mutex);
+    if (br_int && (!pinctrl.br_int_name || strcmp(pinctrl.br_int_name,
+                                                  br_int->name))) {
+        if (pinctrl.br_int_name) {
+            free(pinctrl.br_int_name);
         }
-
-        const struct ofp_header *oh = msg->data;
-        enum ofptype type;
-
-        ofptype_decode(&type, oh);
-        pinctrl_recv(dns_table, oh, type);
-        ofpbuf_delete(msg);
+        pinctrl.br_int_name = xstrdup(br_int->name);
+        /* Notify pinctrl_handler that integration bridge is
+         * set/changed. */
+        notify_pinctrl_handler();
     }
-
     run_put_mac_bindings(ovnsb_idl_txn, sbrec_datapath_binding_by_key,
                          sbrec_port_binding_by_key,
                          sbrec_mac_binding_by_lport_ip);
-    send_garp_run(sbrec_chassis_by_name, sbrec_port_binding_by_datapath,
-                  sbrec_port_binding_by_name, br_int, chassis,
-                  local_datapaths, active_tunnels);
-    send_ipv6_ras(sbrec_port_binding_by_datapath,
-                  sbrec_port_binding_by_name, local_datapaths);
-    buffered_packets_map_gc();
+    send_garp_prepare(sbrec_chassis_by_name, sbrec_port_binding_by_datapath,
+                      sbrec_port_binding_by_name, br_int, chassis,
+                      local_datapaths, active_tunnels);
+    prepare_ipv6_ras(sbrec_port_binding_by_datapath,
+                     sbrec_port_binding_by_name, local_datapaths);
+    sync_dns_cache(dns_table);
+    ovs_mutex_unlock(&pinctrl_mutex);
 }
 
-/* Table of ipv6_ra_state structures, keyed on logical port name */
+/* Table of ipv6_ra_state structures, keyed on logical port name.
+ * Protected by pinctrl_mutex. */
 static struct shash ipv6_ras;
 
 /* Next IPV6 RA in seconds. */
@@ -1706,8 +1976,9 @@  put_load(uint64_t value, enum mf_field_id dst, int ofs, int n_bits,
     bitwise_one(ofpact_set_field_mask(sf), sf->field->n_bytes, ofs, n_bits);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static long long int
-ipv6_ra_send(struct ipv6_ra_state *ra)
+ipv6_ra_send(struct ipv6_ra_state *ra, struct rconn *swconn)
 {
     if (time_msec() < ra->next_announce) {
         return ra->next_announce;
@@ -1754,7 +2025,7 @@  ipv6_ra_send(struct ipv6_ra_state *ra)
     match_set_in_port(&po.flow_metadata, OFPP_CONTROLLER);
     enum ofp_version version = rconn_get_version(swconn);
     enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
-    queue_msg(ofputil_encode_packet_out(&po, proto));
+    queue_msg(swconn, ofputil_encode_packet_out(&po, proto));
     dp_packet_uninit(&packet);
     ofpbuf_uninit(&ofpacts);
 
@@ -1764,26 +2035,47 @@  ipv6_ra_send(struct ipv6_ra_state *ra)
     return ra->next_announce;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 ipv6_ra_wait(void)
 {
-    poll_timer_wait_until(send_ipv6_ra_time);
+    /* Set the poll timer for next IPv6 RA only if IPv6 RAs needs to
+     * be sent. */
+    if (!shash_is_empty(&ipv6_ras)) {
+        poll_timer_wait_until(send_ipv6_ra_time);
+    }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-send_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
-              struct ovsdb_idl_index *sbrec_port_binding_by_name,
-              const struct hmap *local_datapaths)
+send_ipv6_ras(struct rconn *swconn)
 {
-    struct shash_node *iter, *iter_next;
-
     send_ipv6_ra_time = LLONG_MAX;
+    struct shash_node *iter;
+    SHASH_FOR_EACH (iter, &ipv6_ras) {
+        struct ipv6_ra_state *ra = iter->data;
+        long long int next_ra = ipv6_ra_send(ra, swconn);
+        if (send_ipv6_ra_time > next_ra) {
+            send_ipv6_ra_time = next_ra;
+        }
+    }
+}
+
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
+static void
+prepare_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
+                 struct ovsdb_idl_index *sbrec_port_binding_by_name,
+                 const struct hmap *local_datapaths)
+{
+    struct shash_node *iter, *iter_next;
 
     SHASH_FOR_EACH (iter, &ipv6_ras) {
         struct ipv6_ra_state *ra = iter->data;
         ra->delete_me = true;
     }
 
+    bool changed = false;
     const struct local_datapath *ld;
     HMAP_FOR_EACH (ld, hmap_node, local_datapaths) {
         struct sbrec_port_binding *target = sbrec_port_binding_index_init_row(
@@ -1822,6 +2114,7 @@  send_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
                     ra->config->min_interval,
                     ra->config->max_interval);
                 shash_add(&ipv6_ras, pb->logical_port, ra);
+                changed = true;
             } else {
                 if (config->min_interval != ra->config->min_interval ||
                     config->max_interval != ra->config->max_interval)
@@ -1840,10 +2133,7 @@  send_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
             ra->metadata = peer->datapath->tunnel_key;
             ra->delete_me = false;
 
-            long long int next_ra = ipv6_ra_send(ra);
-            if (send_ipv6_ra_time > next_ra) {
-                send_ipv6_ra_time = next_ra;
-            }
+            /* pinctrl_handler thread will send the IPv6 RAs. */
         }
         sbrec_port_binding_index_destroy_row(target);
     }
@@ -1856,26 +2146,39 @@  send_ipv6_ras(struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
             ipv6_ra_delete(ra);
         }
     }
+
+    if (changed) {
+        notify_pinctrl_handler();
+    }
+
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 void
 pinctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
 {
     wait_put_mac_bindings(ovnsb_idl_txn);
-    rconn_run_wait(swconn);
-    rconn_recv_wait(swconn);
-    send_garp_wait();
-    ipv6_ra_wait();
+    int64_t new_seq = seq_read(pinctrl_main_seq);
+    seq_wait(pinctrl_main_seq, new_seq);
 }
 
+/* Called by ovn-controller. */
 void
 pinctrl_destroy(void)
 {
-    rconn_destroy(swconn);
-    destroy_put_mac_bindings();
+    latch_set(&pinctrl.pinctrl_thread_exit);
+    pthread_join(pinctrl.pinctrl_thread, NULL);
+    latch_destroy(&pinctrl.pinctrl_thread_exit);
+    free(pinctrl.br_int_name);
     destroy_send_garps();
     destroy_ipv6_ras();
     destroy_buffered_packets_map();
+    destroy_put_mac_bindings();
+    destroy_buffered_mac_bindings();
+    destroy_dns_cache();
+    seq_destroy(pinctrl_main_seq);
+    seq_destroy(pinctrl_handler_seq);
 }
 
 /* Implementation of the "put_arp" and "put_nd" OVN actions.  These
@@ -1906,11 +2209,13 @@  struct put_mac_binding {
 
 /* Contains "struct put_mac_binding"s. */
 static struct hmap put_mac_bindings;
+static struct hmap buffered_mac_bindings;
 
 static void
 init_put_mac_bindings(void)
 {
     hmap_init(&put_mac_bindings);
+    hmap_init(&buffered_mac_bindings);
 }
 
 static void
@@ -1920,6 +2225,17 @@  destroy_put_mac_bindings(void)
     hmap_destroy(&put_mac_bindings);
 }
 
+static void
+destroy_buffered_mac_bindings(void)
+{
+    struct put_mac_binding *pmb;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &buffered_mac_bindings) {
+       free(pmb);
+    }
+
+    hmap_destroy(&buffered_mac_bindings);
+}
+
 static struct put_mac_binding *
 pinctrl_find_put_mac_binding(uint32_t dp_key, uint32_t port_key,
                              const struct in6_addr *ip_key, uint32_t hash)
@@ -1935,13 +2251,13 @@  pinctrl_find_put_mac_binding(uint32_t dp_key, uint32_t port_key,
     return NULL;
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_mac_binding(const struct flow *md,
                                const struct flow *headers, bool is_arp)
 {
     uint32_t dp_key = ntohll(md->metadata);
     uint32_t port_key = md->regs[MFF_LOG_INPORT - MFF_REG0];
-    struct buffered_packets *bp;
     struct in6_addr ip_key;
 
     if (is_arp) {
@@ -1969,11 +2285,27 @@  pinctrl_handle_put_mac_binding(const struct flow *md,
     pmb->timestamp = time_msec();
     pmb->mac = headers->dl_src;
 
-    /* send queued pkts */
-    uint32_t bhash = hash_bytes(&ip_key, sizeof ip_key, 0);
-    bp = pinctrl_find_buffered_packets(&ip_key, bhash);
-    if (bp) {
-        buffered_send_packets(bp, &pmb->mac);
+    /* We can send the buffered packet once the main ovn-controller
+     * thread calls pinctrl_run() and it writes the mac_bindings stored
+     * in 'put_mac_bindings' hmap into the Southbound MAC_Binding table. */
+    notify_pinctrl_main();
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+send_mac_binding_buffered_pkts(struct rconn *swconn)
+{
+    struct put_mac_binding *pmb;
+    struct buffered_packets *bp;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &buffered_mac_bindings) {
+        uint32_t bhash = hash_bytes(&pmb->ip_key, sizeof pmb->ip_key, 0);
+
+        bp = pinctrl_find_buffered_packets(&pmb->ip_key, bhash);
+        if (bp) {
+            buffered_send_packets(swconn, bp, &pmb->mac);
+        }
+
+        free(pmb);
     }
 }
 
@@ -2043,6 +2375,8 @@  run_put_mac_binding(struct ovsdb_idl_txn *ovnsb_idl_txn,
     ds_destroy(&ip_s);
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 static void
 run_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn,
                      struct ovsdb_idl_index *sbrec_datapath_binding_by_key,
@@ -2060,7 +2394,14 @@  run_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn,
                             sbrec_mac_binding_by_lport_ip,
                             pmb);
     }
-    flush_put_mac_bindings();
+
+    /* Move the mac bindings from 'put_mac_bindings' hmap to
+     * 'buffered_mac_bindings' and notify the pinctrl_handler.
+     * pinctrl_handler will reinject the buffered packets. */
+    if (!hmap_is_empty(&put_mac_bindings)) {
+        buffer_put_mac_bindings();
+        notify_pinctrl_handler();
+    }
 }
 
 static void
@@ -2071,6 +2412,17 @@  wait_put_mac_bindings(struct ovsdb_idl_txn *ovnsb_idl_txn)
     }
 }
 
+static void
+buffer_put_mac_bindings(void)
+{
+    struct put_mac_binding *pmb;
+    HMAP_FOR_EACH_POP (pmb, hmap_node, &put_mac_bindings) {
+        uint32_t hash = hash_bytes(&pmb->ip_key, sizeof pmb->ip_key,
+                                   hash_2words(pmb->dp_key, pmb->port_key));
+        hmap_insert(&buffered_mac_bindings, &pmb->hmap_node, hash);
+    }
+}
+
 static void
 flush_put_mac_bindings(void)
 {
@@ -2097,7 +2449,7 @@  struct garp_data {
     uint32_t port_key;           /* Port to inject the GARP into. */
 };
 
-/* Contains GARPs to be sent. */
+/* Contains GARPs to be sent. Protected by pinctrl_mutex*/
 static struct shash send_garp_data;
 
 /* Next GARP announcement in ms. */
@@ -2116,6 +2468,7 @@  destroy_send_garps(void)
     shash_destroy_free_data(&send_garp_data);
 }
 
+/* Runs with in the main ovn-controller thread context. */
 static void
 add_garp(const char *name, const struct eth_addr ea, ovs_be32 ip,
          uint32_t dp_key, uint32_t port_key)
@@ -2128,6 +2481,10 @@  add_garp(const char *name, const struct eth_addr ea, ovs_be32 ip,
     garp->dp_key = dp_key;
     garp->port_key = port_key;
     shash_add(&send_garp_data, name, garp);
+
+    /* Notify pinctrl_handler so that it can wakeup and process
+     * these GARP requests. */
+    notify_pinctrl_handler();
 }
 
 /* Add or update a vif for which GARPs need to be announced. */
@@ -2199,10 +2556,14 @@  send_garp_delete(const char *lport)
 {
     struct garp_data *garp = shash_find_and_delete(&send_garp_data, lport);
     free(garp);
+    notify_pinctrl_handler();
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 static long long int
-send_garp(struct garp_data *garp, long long int current_time)
+send_garp(struct garp_data *garp, long long int current_time,
+          struct rconn *swconn)
 {
     if (current_time < garp->announce_time) {
         return garp->announce_time;
@@ -2234,7 +2595,7 @@  send_garp(struct garp_data *garp, long long int current_time)
     };
     match_set_in_port(&po.flow_metadata, OFPP_CONTROLLER);
     enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
-    queue_msg(ofputil_encode_packet_out(&po, proto));
+    queue_msg(swconn, ofputil_encode_packet_out(&po, proto));
     dp_packet_uninit(&packet);
     ofpbuf_uninit(&ofpacts);
 
@@ -2500,17 +2861,40 @@  get_nat_addresses_and_keys(struct ovsdb_idl_index *sbrec_chassis_by_name,
 static void
 send_garp_wait(void)
 {
-    poll_timer_wait_until(send_garp_time);
+    /* Set the poll timer for next garp only if there is garp data to
+     * be sent. */
+    if (!shash_is_empty(&send_garp_data)) {
+        poll_timer_wait_until(send_garp_time);
+    }
+}
+
+/* Called with in the pinctrl_handler thread context. */
+static void
+send_garp_run(struct rconn *swconn)
+{
+    /* Send GARPs, and update the next announcement. */
+    struct shash_node *iter;
+    long long int current_time = time_msec();
+    send_garp_time = LLONG_MAX;
+    SHASH_FOR_EACH (iter, &send_garp_data) {
+        long long int next_announce = send_garp(iter->data, current_time,
+                                                swconn);
+        if (send_garp_time > next_announce) {
+            send_garp_time = next_announce;
+        }
+    }
 }
 
+/* Called by pinctrl_run(). Runs with in the main ovn-controller
+ * thread context. */
 static void
-send_garp_run(struct ovsdb_idl_index *sbrec_chassis_by_name,
-              struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
-              struct ovsdb_idl_index *sbrec_port_binding_by_name,
-              const struct ovsrec_bridge *br_int,
-              const struct sbrec_chassis *chassis,
-              const struct hmap *local_datapaths,
-              const struct sset *active_tunnels)
+send_garp_prepare(struct ovsdb_idl_index *sbrec_chassis_by_name,
+                  struct ovsdb_idl_index *sbrec_port_binding_by_datapath,
+                  struct ovsdb_idl_index *sbrec_port_binding_by_name,
+                  const struct ovsrec_bridge *br_int,
+                  const struct sbrec_chassis *chassis,
+                  const struct hmap *local_datapaths,
+                  const struct sset *active_tunnels)
 {
     struct sset localnet_vifs = SSET_INITIALIZER(&localnet_vifs);
     struct sset local_l3gw_ports = SSET_INITIALIZER(&local_l3gw_ports);
@@ -2558,15 +2942,8 @@  send_garp_run(struct ovsdb_idl_index *sbrec_chassis_by_name,
         }
     }
 
-    /* Send GARPs, and update the next announcement. */
-    long long int current_time = time_msec();
-    send_garp_time = LLONG_MAX;
-    SHASH_FOR_EACH (iter, &send_garp_data) {
-        long long int next_announce = send_garp(iter->data, current_time);
-        if (send_garp_time > next_announce) {
-            send_garp_time = next_announce;
-        }
-    }
+    /* pinctrl_handler thread will send the GARPs. */
+
     sset_destroy(&localnet_vifs);
     sset_destroy(&local_l3gw_ports);
 
@@ -2617,8 +2994,10 @@  reload_metadata(struct ofpbuf *ofpacts, const struct match *md)
     }
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_nd_na(const struct flow *ip_flow, const struct match *md,
+pinctrl_handle_nd_na(struct rconn *swconn, const struct flow *ip_flow,
+                     const struct match *md,
                      struct ofpbuf *userdata, bool is_router)
 {
     /* This action only works for IPv6 ND packets, and the switch should only
@@ -2644,12 +3023,14 @@  pinctrl_handle_nd_na(const struct flow *ip_flow, const struct match *md,
                   htonl(rso_flags));
 
     /* Reload previous packet metadata and set actions from userdata. */
-    set_actions_and_enqueue_msg(&packet, md, userdata);
+    set_actions_and_enqueue_msg(swconn, &packet, md, userdata);
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
-pinctrl_handle_nd_ns(const struct flow *ip_flow, struct dp_packet *pkt_in,
+pinctrl_handle_nd_ns(struct rconn *swconn, const struct flow *ip_flow,
+                     struct dp_packet *pkt_in,
                      const struct match *md, struct ofpbuf *userdata)
 {
     /* This action only works for IPv6 packets. */
@@ -2669,12 +3050,14 @@  pinctrl_handle_nd_ns(const struct flow *ip_flow, struct dp_packet *pkt_in,
                   &ip_flow->ipv6_dst);
 
     /* Reload previous packet metadata and set actions from userdata. */
-    set_actions_and_enqueue_msg(&packet, md, userdata);
+    set_actions_and_enqueue_msg(swconn, &packet, md, userdata);
     dp_packet_uninit(&packet);
 }
 
+/* Called with in the pinctrl_handler thread context. */
 static void
 pinctrl_handle_put_nd_ra_opts(
+    struct rconn *swconn,
     const struct flow *in_flow, struct dp_packet *pkt_in,
     struct ofputil_packet_in *pin, struct ofpbuf *userdata,
     struct ofpbuf *continuation)
@@ -2756,6 +3139,6 @@  exit:
         sv.u8_val = success;
         mf_write_subfield(&dst, &sv, &pin->flow_metadata);
     }
-    queue_msg(ofputil_encode_resume(pin, continuation, proto));
+    queue_msg(swconn, ofputil_encode_resume(pin, continuation, proto));
     dp_packet_uninit(pkt_out_ptr);
 }