diff mbox series

[ovs-dev,v4,3/3] ovn-controller: Use separate thread for packet-in processing.

Message ID 1503893682-65314-3-git-send-email-zhouhan@gmail.com
State Deferred
Headers show
Series [ovs-dev,v4,1/3] ovn-controller: readonly mode binding_run and get_br_int | expand

Commit Message

Han Zhou Aug. 28, 2017, 4:14 a.m. UTC
This patch introduces multi-threading for ovn-controller and use
dedicated thread for packet-in processing as a start. It decouples
packet-in processing and ovs flow computing, so that packet-in inputs
won't trigger flow recomputing, and flow computing won't block
packet-in processing. In large scale environment this largely reduces
CPU cost and improves performance.

Related effort and discussion:
https://mail.openvswitch.org/pipermail/ovs-dev/2017-May/331813.html

Signed-off-by: Han Zhou <zhouhan@gmail.com>
---
v3->v4: rebased on master.

 ovn/controller/ovn-controller.c |  71 ++++++++++++++++++---------
 ovn/controller/ovn-controller.h |  38 +++++++++++++++
 ovn/controller/pinctrl.c        | 105 ++++++++++++++++++++++++++++++++++++++++
 ovn/controller/pinctrl.h        |   1 +
 4 files changed, 193 insertions(+), 22 deletions(-)

Comments

Guoshuai Li Sept. 27, 2017, 12:08 a.m. UTC | #1
This is very useful to me.

I found a problem in my use:
In the ovn-controller and the south of the database connection,when the 
ovn-controller as a passive service, the SB as a client, such as 
configured to

ovs-vsctl set Open_vSwitch . external-ids:ovn-remote=ptcp:6644:0.0.0.0
ovn-sbctl set-connection  tcp:10.157.145.211:6644 
tcp:10.157.145.212:6644 tcp:10.157.145.213:6644 tcp:10.157.145.214:6644

This configuration is to determine the status of the chassis through the 
state of the connection in the OVNSB。


But here the two threads listening conflict:
2017-09-27T00:00:59.387Z|26144|socket_util|ERR|6644:0.0.0.0: bind: 
Address already in use
2017-09-27T00:00:59.387Z|26145|reconnect|INFO|ptcp:6644:0.0.0.0: 
listening...
2017-09-27T00:00:59.387Z|26146|reconnect|INFO|ptcp:6644:0.0.0.0: listen 
attempt failed (Address already in use)

Do you have a good way to fix it?

And this patch also conflict with master...


  2017/8/28 12:14, Han Zhou :
> This patch introduces multi-threading for ovn-controller and use
> dedicated thread for packet-in processing as a start. It decouples
> packet-in processing and ovs flow computing, so that packet-in inputs
> won't trigger flow recomputing, and flow computing won't block
> packet-in processing. In large scale environment this largely reduces
> CPU cost and improves performance.
>
> Related effort and discussion:
> https://mail.openvswitch.org/pipermail/ovs-dev/2017-May/331813.html
>
> Signed-off-by: Han Zhou <zhouhan@gmail.com>
> ---
> v3->v4: rebased on master.
>
>   ovn/controller/ovn-controller.c |  71 ++++++++++++++++++---------
>   ovn/controller/ovn-controller.h |  38 +++++++++++++++
>   ovn/controller/pinctrl.c        | 105 ++++++++++++++++++++++++++++++++++++++++
>   ovn/controller/pinctrl.h        |   1 +
>   4 files changed, 193 insertions(+), 22 deletions(-)
>
> diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c
> index 414443f..cb04244 100644
> --- a/ovn/controller/ovn-controller.c
> +++ b/ovn/controller/ovn-controller.c
> @@ -56,6 +56,8 @@
>   #include "stream.h"
>   #include "unixctl.h"
>   #include "util.h"
> +#include "latch.h"
> +#include "ovs-thread.h"
>   
>   VLOG_DEFINE_THIS_MODULE(main);
>   
> @@ -66,8 +68,6 @@ static unixctl_cb_func inject_pkt;
>   #define DEFAULT_BRIDGE_NAME "br-int"
>   #define DEFAULT_PROBE_INTERVAL_MSEC 5000
>   
> -static void update_probe_interval(struct controller_ctx *,
> -                                  const char *ovnsb_remote);
>   static void parse_options(int argc, char *argv[]);
>   OVS_NO_RETURN static void usage(void);
>   
> @@ -78,7 +78,7 @@ struct pending_pkt {
>       char *flow_s;
>   };
>   
> -static char *ovs_remote;
> +char *ovs_remote;
>   
>   struct local_datapath *
>   get_local_datapath(const struct hmap *local_datapaths, uint32_t tunnel_key)
> @@ -129,7 +129,7 @@ get_bridge(struct ovsdb_idl *ovs_idl, const char *br_name)
>       return NULL;
>   }
>   
> -static void
> +void
>   update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
>                      const struct sbrec_chassis *chassis,
>                      const struct sset *local_ifaces,
> @@ -257,7 +257,7 @@ create_br_int(struct controller_ctx *ctx)
>       return bridge;
>   }
>   
> -static const struct ovsrec_bridge *
> +const struct ovsrec_bridge *
>   get_br_int(struct controller_ctx *ctx)
>   {
>       const struct ovsrec_open_vswitch *cfg;
> @@ -269,7 +269,7 @@ get_br_int(struct controller_ctx *ctx)
>       return get_bridge(ctx->ovs_idl, br_int_name(cfg));
>   }
>   
> -static const char *
> +const char *
>   get_chassis_id(const struct ovsdb_idl *ovs_idl)
>   {
>       const struct ovsrec_open_vswitch *cfg = ovsrec_open_vswitch_first(ovs_idl);
> @@ -309,7 +309,7 @@ update_ssl_config(const struct ovsdb_idl *ovs_idl)
>   
>   /* Retrieves the OVN Southbound remote location from the
>    * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of it. */
> -static char *
> +char *
>   get_ovnsb_remote(struct ovsdb_idl *ovs_idl)
>   {
>       while (1) {
> @@ -498,6 +498,22 @@ get_nb_cfg(struct ovsdb_idl *idl)
>   }
>   
>   static void
> +ctrl_thread_create(struct ctrl_thread *thread, const char *name,
> +    void *(*start)(void *))
> +{
> +    latch_init(&thread->exit_latch);
> +    thread->thread = ovs_thread_create(name, start, thread);
> +}
> +
> +static void
> +ctrl_thread_exit(struct ctrl_thread *thread)
> +{
> +    latch_set(&thread->exit_latch);
> +    xpthread_join(thread->thread, NULL);
> +    latch_destroy(&thread->exit_latch);
> +}
> +
> +void
>   ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl)
>   {
>       /* We do not monitor all tables by default, so modules must register
> @@ -574,6 +590,22 @@ create_ovnsb_indexes(struct ovsdb_idl *ovnsb_idl)
>                                  OVSDB_INDEX_ASC, NULL);
>   }
>   
> +void
> +connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
> +              struct ovnsb_cursors *cursors,
> +              const char *ovnsb_remote)
> +{
> +    ovnsb_idl_loop->idl = ovsdb_idl_create(ovnsb_remote,
> +            &sbrec_idl_class, true, true);
> +
> +    create_ovnsb_indexes(ovnsb_idl_loop->idl);
> +    lport_init(cursors, ovnsb_idl_loop->idl);
> +
> +    ovsdb_idl_omit_alert(ovnsb_idl_loop->idl, &sbrec_chassis_col_nb_cfg);
> +    update_sb_monitors(ovnsb_idl_loop->idl, NULL, NULL, NULL);
> +    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop->idl);
> +}
> +
>   int
>   main(int argc, char *argv[])
>   {
> @@ -605,7 +637,6 @@ main(int argc, char *argv[])
>       daemonize_complete();
>   
>       ofctrl_init(&group_table);
> -    pinctrl_init();
>       lflow_init();
>   
>       /* Connect to OVS OVSDB instance. */
> @@ -616,16 +647,9 @@ main(int argc, char *argv[])
>   
>       /* Connect to OVN SB database and get a snapshot. */
>       char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
> -    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> -        ovsdb_idl_create(ovnsb_remote, &sbrec_idl_class, true, true));
> -
> -    create_ovnsb_indexes(ovnsb_idl_loop.idl);
> +    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(NULL);
>       struct ovnsb_cursors ovnsb_cursors;
> -    lport_init(ovnsb_cursors, ovnsb_idl_loop.idl);
> -
> -    ovsdb_idl_omit_alert(ovnsb_idl_loop.idl, &sbrec_chassis_col_nb_cfg);
> -    update_sb_monitors(ovnsb_idl_loop.idl, NULL, NULL, NULL);
> -    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop.idl);
> +    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
>   
>       /* Initialize connection tracking zones. */
>       struct simap ct_zones = SIMAP_INITIALIZER(&ct_zones);
> @@ -641,6 +665,10 @@ main(int argc, char *argv[])
>       unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
>                                &pending_pkt);
>   
> +
> +    struct ctrl_thread pinctrl_thread;
> +    ctrl_thread_create(&pinctrl_thread, "pinctrl", pinctrl_thread_main);
> +
>       /* Main loop. */
>       exiting = false;
>       while (!exiting) {
> @@ -705,8 +733,6 @@ main(int argc, char *argv[])
>               enum mf_field_id mff_ovn_geneve = ofctrl_run(br_int,
>                                                            &pending_ct_zones);
>   
> -            pinctrl_run(&ctx, br_int, chassis, &chassis_index,
> -                        &local_datapaths, &active_tunnels);
>               update_ct_zones(&local_lports, &local_datapaths, &ct_zones,
>                               ct_zone_bitmap, &pending_ct_zones);
>               if (ctx.ovs_idl_txn) {
> @@ -791,7 +817,6 @@ main(int argc, char *argv[])
>   
>           if (br_int) {
>               ofctrl_wait();
> -            pinctrl_wait(&ctx);
>           }
>           ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
>   
> @@ -840,10 +865,12 @@ main(int argc, char *argv[])
>           poll_block();
>       }
>   
> +    /* stop child controller threads */
> +    ctrl_thread_exit(&pinctrl_thread);
> +
>       unixctl_server_destroy(unixctl);
>       lflow_destroy();
>       ofctrl_destroy();
> -    pinctrl_destroy();
>   
>       simap_destroy(&ct_zones);
>   
> @@ -1000,7 +1027,7 @@ inject_pkt(struct unixctl_conn *conn, int argc OVS_UNUSED,
>   
>   /* Get the desired SB probe timer from the OVS database and configure it into
>    * the SB database. */
> -static void
> +void
>   update_probe_interval(struct controller_ctx *ctx, const char *ovnsb_remote)
>   {
>       const struct ovsrec_open_vswitch *cfg
> diff --git a/ovn/controller/ovn-controller.h b/ovn/controller/ovn-controller.h
> index f57c557..1234c2b 100644
> --- a/ovn/controller/ovn-controller.h
> +++ b/ovn/controller/ovn-controller.h
> @@ -18,7 +18,9 @@
>   #define OVN_CONTROLLER_H 1
>   
>   #include "simap.h"
> +#include "sset.h"
>   #include "ovn/lib/ovn-sb-idl.h"
> +#include "latch.h"
>   
>   /* Linux supports a maximum of 64K zones, which seems like a fine default. */
>   #define MAX_CT_ZONES 65535
> @@ -71,6 +73,13 @@ struct local_datapath {
>       size_t n_peer_dps;
>   };
>   
> +struct ctrl_thread {
> +    pthread_t thread;
> +
> +    /* Controls thread exit. */
> +    struct latch exit_latch;
> +};
> +
>   struct local_datapath *get_local_datapath(const struct hmap *,
>                                             uint32_t tunnel_key);
>   
> @@ -90,5 +99,34 @@ enum chassis_tunnel_type {
>   
>   uint32_t get_tunnel_type(const char *name);
>   
> +/* Retrieves the OVN Southbound remote location from the
> + * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of it. */
> +char *get_ovnsb_remote(struct ovsdb_idl *ovs_idl);
> +
> +void
> +update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
> +                   const struct sbrec_chassis *chassis,
> +                   const struct sset *local_ifaces,
> +                   struct hmap *local_datapaths);
> +
> +/* Get the desired SB probe timer from the OVS database and configure it into
> + * the SB database. */
> +void
> +update_probe_interval(struct controller_ctx *ctx, const char *ovnsb_remote);
> +
> +const struct ovsrec_bridge *
> +get_br_int(struct controller_ctx *ctx);
> +
> +const char *
> +get_chassis_id(const struct ovsdb_idl *ovs_idl);
> +
> +void
> +ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl);
> +
> +void
> +connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
> +              struct ovnsb_cursors *cursors,
> +              const char *ovnsb_remote);
>   
> +extern char *ovs_remote;
>   #endif /* ovn/ovn-controller.h */
> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
> index 9412b48..8a30be2 100644
> --- a/ovn/controller/pinctrl.c
> +++ b/ovn/controller/pinctrl.c
> @@ -18,6 +18,7 @@
>   
>   #include "pinctrl.h"
>   
> +#include "bfd.h"
>   #include "coverage.h"
>   #include "csum.h"
>   #include "dirs.h"
> @@ -41,6 +42,7 @@
>   #include "ovn/lex.h"
>   #include "ovn/lib/acl-log.h"
>   #include "ovn/lib/logical-fields.h"
> +#include "ovn/lib/chassis-index.h"
>   #include "ovn/lib/ovn-dhcp.h"
>   #include "ovn/lib/ovn-util.h"
>   #include "poll-loop.h"
> @@ -48,6 +50,8 @@
>   #include "socket-util.h"
>   #include "timeval.h"
>   #include "vswitch-idl.h"
> +#include "latch.h"
> +#include "binding.h"
>   
>   VLOG_DEFINE_THIS_MODULE(pinctrl);
>   
> @@ -84,6 +88,107 @@ static void reload_metadata(struct ofpbuf *ofpacts,
>   
>   COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
>   
> +void *
> +pinctrl_thread_main(void *arg)
> +{
> +    struct ctrl_thread *thread = arg;
> +    pinctrl_init();
> +
> +    /* Connect to OVS OVSDB instance. */
> +    struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> +        ovsdb_idl_create(ovs_remote, &ovsrec_idl_class, false, true));
> +    ctrl_register_ovs_idl(ovs_idl_loop.idl);
> +    ovsdb_idl_get_initial_snapshot(ovs_idl_loop.idl);
> +
> +    /* Connect to OVN SB database and get a snapshot. */
> +    char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
> +    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(NULL);
> +    struct ovnsb_cursors ovnsb_cursors;
> +    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
> +
> +    while (!latch_is_set(&thread->exit_latch)) {
> +        /* Below logic is similar as in main loop in ovn-controller.c,
> +         * while the purpose here is packet-in processing only */
> +        char *new_ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
> +        if (strcmp(ovnsb_remote, new_ovnsb_remote)) {
> +            free(ovnsb_remote);
> +            ovnsb_remote = new_ovnsb_remote;
> +            ovsdb_idl_set_remote(ovnsb_idl_loop.idl, ovnsb_remote, true);
> +        } else {
> +            free(new_ovnsb_remote);
> +        }
> +
> +        struct controller_ctx ctx = {
> +            .ovs_idl = ovs_idl_loop.idl,
> +            .ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop),
> +            .ovnsb_idl = ovnsb_idl_loop.idl,
> +            .ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
> +            .ovnsb_cursors = &ovnsb_cursors,
> +        };
> +
> +        update_probe_interval(&ctx, ovnsb_remote);
> +
> +        struct hmap local_datapaths = HMAP_INITIALIZER(&local_datapaths);
> +        struct sset local_lports = SSET_INITIALIZER(&local_lports);
> +        struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
> +        const struct ovsrec_bridge *br_int = get_br_int(&ctx);
> +        const char *chassis_id = get_chassis_id(ctx.ovs_idl);
> +
> +        struct chassis_index chassis_index;
> +
> +        chassis_index_init(&chassis_index, ctx.ovnsb_idl);
> +
> +        if (ctx.ovnsb_idl_txn) {
> +            const struct sbrec_chassis *chassis = NULL;
> +            if (chassis_id) {
> +                chassis = get_chassis(ctx.ovnsb_idl, chassis_id);
> +                bfd_calculate_active_tunnels(br_int, &active_tunnels);
> +                binding_get(&ctx, br_int, chassis,
> +                            &chassis_index, &active_tunnels, &local_datapaths,
> +                            &local_lports);
> +            }
> +
> +            if (br_int && chassis) {
> +                pinctrl_run(&ctx, br_int, chassis, &chassis_index,
> +                            &local_datapaths, &active_tunnels);
> +                update_sb_monitors(ctx.ovnsb_idl, chassis,
> +                                   &local_lports, &local_datapaths);
> +            }
> +        }
> +
> +        chassis_index_destroy(&chassis_index);
> +        sset_destroy(&local_lports);
> +        sset_destroy(&active_tunnels);
> +
> +        struct local_datapath *cur_node, *next_node;
> +        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, &local_datapaths) {
> +            free(cur_node->peer_dps);
> +            hmap_remove(&local_datapaths, &cur_node->hmap_node);
> +            free(cur_node);
> +        }
> +        hmap_destroy(&local_datapaths);
> +
> +        if (br_int) {
> +            pinctrl_wait(&ctx);
> +        }
> +        ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
> +        ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> +
> +        latch_wait(&thread->exit_latch);
> +        poll_block();
> +    }
> +
> +    pinctrl_destroy();
> +
> +    ovsdb_idl_loop_destroy(&ovs_idl_loop);
> +    ovsdb_idl_loop_destroy(&ovnsb_idl_loop);
> +
> +    free(ovnsb_remote);
> +
> +    VLOG_INFO("pinctrl thread done");
> +    return NULL;
> +}
> +
>   void
>   pinctrl_init(void)
>   {
> diff --git a/ovn/controller/pinctrl.h b/ovn/controller/pinctrl.h
> index fc9cca8..15b12a0 100644
> --- a/ovn/controller/pinctrl.h
> +++ b/ovn/controller/pinctrl.h
> @@ -34,6 +34,7 @@ void pinctrl_run(struct controller_ctx *,
>                    const struct ovsrec_bridge *, const struct sbrec_chassis *,
>                    const struct chassis_index *, struct hmap *local_datapaths,
>                    struct sset *active_tunnels);
> +void *pinctrl_thread_main(void *arg);
>   void pinctrl_wait(struct controller_ctx *);
>   void pinctrl_destroy(void);
>
Han Zhou Sept. 27, 2017, 10:37 p.m. UTC | #2
Hi Guoshuai,

Thanks for the feedback. I am glad it is useful for you. And you have a
very valid point that the current patch breaks the connection status
feature. I didn't think about the detailed fix yet but in general I think
it might be reasonable to add a "proxy" layer of HV-local SB DB, which will
be solve the problem you mentioned and also save the bandwidth, without
doubling the burden of the central SB DB. What do you think?

Thanks,
Han

On Tue, Sep 26, 2017 at 5:08 PM, Guoshuai Li <ligs@dtdream.com> wrote:
>
> This is very useful to me.
>
> I found a problem in my use:
> In the ovn-controller and the south of the database connection,when the
ovn-controller as a passive service, the SB as a client, such as configured
to
>
> ovs-vsctl set Open_vSwitch . external-ids:ovn-remote=ptcp:6644:0.0.0.0
> ovn-sbctl set-connection  tcp:10.157.145.211:6644 tcp:10.157.145.212:6644
tcp:10.157.145.213:6644 tcp:10.157.145.214:6644
>
> This configuration is to determine the status of the chassis through the
state of the connection in the OVNSB。
>
>
> But here the two threads listening conflict:
> 2017-09-27T00:00:59.387Z|26144|socket_util|ERR|6644:0.0.0.0: bind:
Address already in use
> 2017-09-27T00:00:59.387Z|26145|reconnect|INFO|ptcp:6644:0.0.0.0:
listening...
> 2017-09-27T00:00:59.387Z|26146|reconnect|INFO|ptcp:6644:0.0.0.0: listen
attempt failed (Address already in use)
>
> Do you have a good way to fix it?
>
> And this patch also conflict with master...
>
>
>  2017/8/28 12:14, Han Zhou :
>
>> This patch introduces multi-threading for ovn-controller and use
>> dedicated thread for packet-in processing as a start. It decouples
>> packet-in processing and ovs flow computing, so that packet-in inputs
>> won't trigger flow recomputing, and flow computing won't block
>> packet-in processing. In large scale environment this largely reduces
>> CPU cost and improves performance.
>>
>> Related effort and discussion:
>> https://mail.openvswitch.org/pipermail/ovs-dev/2017-May/331813.html
>>
>> Signed-off-by: Han Zhou <zhouhan@gmail.com>
>> ---
>> v3->v4: rebased on master.
>>
>>   ovn/controller/ovn-controller.c |  71 ++++++++++++++++++---------
>>   ovn/controller/ovn-controller.h |  38 +++++++++++++++
>>   ovn/controller/pinctrl.c        | 105
++++++++++++++++++++++++++++++++++++++++
>>   ovn/controller/pinctrl.h        |   1 +
>>   4 files changed, 193 insertions(+), 22 deletions(-)
>>
>> diff --git a/ovn/controller/ovn-controller.c
b/ovn/controller/ovn-controller.c
>> index 414443f..cb04244 100644
>> --- a/ovn/controller/ovn-controller.c
>> +++ b/ovn/controller/ovn-controller.c
>> @@ -56,6 +56,8 @@
>>   #include "stream.h"
>>   #include "unixctl.h"
>>   #include "util.h"
>> +#include "latch.h"
>> +#include "ovs-thread.h"
>>     VLOG_DEFINE_THIS_MODULE(main);
>>   @@ -66,8 +68,6 @@ static unixctl_cb_func inject_pkt;
>>   #define DEFAULT_BRIDGE_NAME "br-int"
>>   #define DEFAULT_PROBE_INTERVAL_MSEC 5000
>>   -static void update_probe_interval(struct controller_ctx *,
>> -                                  const char *ovnsb_remote);
>>   static void parse_options(int argc, char *argv[]);
>>   OVS_NO_RETURN static void usage(void);
>>   @@ -78,7 +78,7 @@ struct pending_pkt {
>>       char *flow_s;
>>   };
>>   -static char *ovs_remote;
>> +char *ovs_remote;
>>     struct local_datapath *
>>   get_local_datapath(const struct hmap *local_datapaths, uint32_t
tunnel_key)
>> @@ -129,7 +129,7 @@ get_bridge(struct ovsdb_idl *ovs_idl, const char
*br_name)
>>       return NULL;
>>   }
>>   -static void
>> +void
>>   update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
>>                      const struct sbrec_chassis *chassis,
>>                      const struct sset *local_ifaces,
>> @@ -257,7 +257,7 @@ create_br_int(struct controller_ctx *ctx)
>>       return bridge;
>>   }
>>   -static const struct ovsrec_bridge *
>> +const struct ovsrec_bridge *
>>   get_br_int(struct controller_ctx *ctx)
>>   {
>>       const struct ovsrec_open_vswitch *cfg;
>> @@ -269,7 +269,7 @@ get_br_int(struct controller_ctx *ctx)
>>       return get_bridge(ctx->ovs_idl, br_int_name(cfg));
>>   }
>>   -static const char *
>> +const char *
>>   get_chassis_id(const struct ovsdb_idl *ovs_idl)
>>   {
>>       const struct ovsrec_open_vswitch *cfg =
ovsrec_open_vswitch_first(ovs_idl);
>> @@ -309,7 +309,7 @@ update_ssl_config(const struct ovsdb_idl *ovs_idl)
>>     /* Retrieves the OVN Southbound remote location from the
>>    * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of
it. */
>> -static char *
>> +char *
>>   get_ovnsb_remote(struct ovsdb_idl *ovs_idl)
>>   {
>>       while (1) {
>> @@ -498,6 +498,22 @@ get_nb_cfg(struct ovsdb_idl *idl)
>>   }
>>     static void
>> +ctrl_thread_create(struct ctrl_thread *thread, const char *name,
>> +    void *(*start)(void *))
>> +{
>> +    latch_init(&thread->exit_latch);
>> +    thread->thread = ovs_thread_create(name, start, thread);
>> +}
>> +
>> +static void
>> +ctrl_thread_exit(struct ctrl_thread *thread)
>> +{
>> +    latch_set(&thread->exit_latch);
>> +    xpthread_join(thread->thread, NULL);
>> +    latch_destroy(&thread->exit_latch);
>> +}
>> +
>> +void
>>   ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl)
>>   {
>>       /* We do not monitor all tables by default, so modules must
register
>> @@ -574,6 +590,22 @@ create_ovnsb_indexes(struct ovsdb_idl *ovnsb_idl)
>>                                  OVSDB_INDEX_ASC, NULL);
>>   }
>>   +void
>> +connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
>> +              struct ovnsb_cursors *cursors,
>> +              const char *ovnsb_remote)
>> +{
>> +    ovnsb_idl_loop->idl = ovsdb_idl_create(ovnsb_remote,
>> +            &sbrec_idl_class, true, true);
>> +
>> +    create_ovnsb_indexes(ovnsb_idl_loop->idl);
>> +    lport_init(cursors, ovnsb_idl_loop->idl);
>> +
>> +    ovsdb_idl_omit_alert(ovnsb_idl_loop->idl,
&sbrec_chassis_col_nb_cfg);
>> +    update_sb_monitors(ovnsb_idl_loop->idl, NULL, NULL, NULL);
>> +    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop->idl);
>> +}
>> +
>>   int
>>   main(int argc, char *argv[])
>>   {
>> @@ -605,7 +637,6 @@ main(int argc, char *argv[])
>>       daemonize_complete();
>>         ofctrl_init(&group_table);
>> -    pinctrl_init();
>>       lflow_init();
>>         /* Connect to OVS OVSDB instance. */
>> @@ -616,16 +647,9 @@ main(int argc, char *argv[])
>>         /* Connect to OVN SB database and get a snapshot. */
>>       char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
>> -    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
>> -        ovsdb_idl_create(ovnsb_remote, &sbrec_idl_class, true, true));
>> -
>> -    create_ovnsb_indexes(ovnsb_idl_loop.idl);
>> +    struct ovsdb_idl_loop ovnsb_idl_loop =
OVSDB_IDL_LOOP_INITIALIZER(NULL);
>>       struct ovnsb_cursors ovnsb_cursors;
>> -    lport_init(ovnsb_cursors, ovnsb_idl_loop.idl);
>> -
>> -    ovsdb_idl_omit_alert(ovnsb_idl_loop.idl, &sbrec_chassis_col_nb_cfg);
>> -    update_sb_monitors(ovnsb_idl_loop.idl, NULL, NULL, NULL);
>> -    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop.idl);
>> +    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
>>         /* Initialize connection tracking zones. */
>>       struct simap ct_zones = SIMAP_INITIALIZER(&ct_zones);
>> @@ -641,6 +665,10 @@ main(int argc, char *argv[])
>>       unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1,
inject_pkt,
>>                                &pending_pkt);
>>   +
>> +    struct ctrl_thread pinctrl_thread;
>> +    ctrl_thread_create(&pinctrl_thread, "pinctrl", pinctrl_thread_main);
>> +
>>       /* Main loop. */
>>       exiting = false;
>>       while (!exiting) {
>> @@ -705,8 +733,6 @@ main(int argc, char *argv[])
>>               enum mf_field_id mff_ovn_geneve = ofctrl_run(br_int,
>>
 &pending_ct_zones);
>>   -            pinctrl_run(&ctx, br_int, chassis, &chassis_index,
>> -                        &local_datapaths, &active_tunnels);
>>               update_ct_zones(&local_lports, &local_datapaths, &ct_zones,
>>                               ct_zone_bitmap, &pending_ct_zones);
>>               if (ctx.ovs_idl_txn) {
>> @@ -791,7 +817,6 @@ main(int argc, char *argv[])
>>             if (br_int) {
>>               ofctrl_wait();
>> -            pinctrl_wait(&ctx);
>>           }
>>           ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
>>   @@ -840,10 +865,12 @@ main(int argc, char *argv[])
>>           poll_block();
>>       }
>>   +    /* stop child controller threads */
>> +    ctrl_thread_exit(&pinctrl_thread);
>> +
>>       unixctl_server_destroy(unixctl);
>>       lflow_destroy();
>>       ofctrl_destroy();
>> -    pinctrl_destroy();
>>         simap_destroy(&ct_zones);
>>   @@ -1000,7 +1027,7 @@ inject_pkt(struct unixctl_conn *conn, int argc
OVS_UNUSED,
>>     /* Get the desired SB probe timer from the OVS database and
configure it into
>>    * the SB database. */
>> -static void
>> +void
>>   update_probe_interval(struct controller_ctx *ctx, const char
*ovnsb_remote)
>>   {
>>       const struct ovsrec_open_vswitch *cfg
>> diff --git a/ovn/controller/ovn-controller.h
b/ovn/controller/ovn-controller.h
>> index f57c557..1234c2b 100644
>> --- a/ovn/controller/ovn-controller.h
>> +++ b/ovn/controller/ovn-controller.h
>> @@ -18,7 +18,9 @@
>>   #define OVN_CONTROLLER_H 1
>>     #include "simap.h"
>> +#include "sset.h"
>>   #include "ovn/lib/ovn-sb-idl.h"
>> +#include "latch.h"
>>     /* Linux supports a maximum of 64K zones, which seems like a fine
default. */
>>   #define MAX_CT_ZONES 65535
>> @@ -71,6 +73,13 @@ struct local_datapath {
>>       size_t n_peer_dps;
>>   };
>>   +struct ctrl_thread {
>> +    pthread_t thread;
>> +
>> +    /* Controls thread exit. */
>> +    struct latch exit_latch;
>> +};
>> +
>>   struct local_datapath *get_local_datapath(const struct hmap *,
>>                                             uint32_t tunnel_key);
>>   @@ -90,5 +99,34 @@ enum chassis_tunnel_type {
>>     uint32_t get_tunnel_type(const char *name);
>>   +/* Retrieves the OVN Southbound remote location from the
>> + * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of it.
*/
>> +char *get_ovnsb_remote(struct ovsdb_idl *ovs_idl);
>> +
>> +void
>> +update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
>> +                   const struct sbrec_chassis *chassis,
>> +                   const struct sset *local_ifaces,
>> +                   struct hmap *local_datapaths);
>> +
>> +/* Get the desired SB probe timer from the OVS database and configure
it into
>> + * the SB database. */
>> +void
>> +update_probe_interval(struct controller_ctx *ctx, const char
*ovnsb_remote);
>> +
>> +const struct ovsrec_bridge *
>> +get_br_int(struct controller_ctx *ctx);
>> +
>> +const char *
>> +get_chassis_id(const struct ovsdb_idl *ovs_idl);
>> +
>> +void
>> +ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl);
>> +
>> +void
>> +connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
>> +              struct ovnsb_cursors *cursors,
>> +              const char *ovnsb_remote);
>>   +extern char *ovs_remote;
>>   #endif /* ovn/ovn-controller.h */
>> diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
>> index 9412b48..8a30be2 100644
>> --- a/ovn/controller/pinctrl.c
>> +++ b/ovn/controller/pinctrl.c
>> @@ -18,6 +18,7 @@
>>     #include "pinctrl.h"
>>   +#include "bfd.h"
>>   #include "coverage.h"
>>   #include "csum.h"
>>   #include "dirs.h"
>> @@ -41,6 +42,7 @@
>>   #include "ovn/lex.h"
>>   #include "ovn/lib/acl-log.h"
>>   #include "ovn/lib/logical-fields.h"
>> +#include "ovn/lib/chassis-index.h"
>>   #include "ovn/lib/ovn-dhcp.h"
>>   #include "ovn/lib/ovn-util.h"
>>   #include "poll-loop.h"
>> @@ -48,6 +50,8 @@
>>   #include "socket-util.h"
>>   #include "timeval.h"
>>   #include "vswitch-idl.h"
>> +#include "latch.h"
>> +#include "binding.h"
>>     VLOG_DEFINE_THIS_MODULE(pinctrl);
>>   @@ -84,6 +88,107 @@ static void reload_metadata(struct ofpbuf *ofpacts,
>>     COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
>>   +void *
>> +pinctrl_thread_main(void *arg)
>> +{
>> +    struct ctrl_thread *thread = arg;
>> +    pinctrl_init();
>> +
>> +    /* Connect to OVS OVSDB instance. */
>> +    struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
>> +        ovsdb_idl_create(ovs_remote, &ovsrec_idl_class, false, true));
>> +    ctrl_register_ovs_idl(ovs_idl_loop.idl);
>> +    ovsdb_idl_get_initial_snapshot(ovs_idl_loop.idl);
>> +
>> +    /* Connect to OVN SB database and get a snapshot. */
>> +    char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
>> +    struct ovsdb_idl_loop ovnsb_idl_loop =
OVSDB_IDL_LOOP_INITIALIZER(NULL);
>> +    struct ovnsb_cursors ovnsb_cursors;
>> +    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
>> +
>> +    while (!latch_is_set(&thread->exit_latch)) {
>> +        /* Below logic is similar as in main loop in ovn-controller.c,
>> +         * while the purpose here is packet-in processing only */
>> +        char *new_ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
>> +        if (strcmp(ovnsb_remote, new_ovnsb_remote)) {
>> +            free(ovnsb_remote);
>> +            ovnsb_remote = new_ovnsb_remote;
>> +            ovsdb_idl_set_remote(ovnsb_idl_loop.idl, ovnsb_remote,
true);
>> +        } else {
>> +            free(new_ovnsb_remote);
>> +        }
>> +
>> +        struct controller_ctx ctx = {
>> +            .ovs_idl = ovs_idl_loop.idl,
>> +            .ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop),
>> +            .ovnsb_idl = ovnsb_idl_loop.idl,
>> +            .ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
>> +            .ovnsb_cursors = &ovnsb_cursors,
>> +        };
>> +
>> +        update_probe_interval(&ctx, ovnsb_remote);
>> +
>> +        struct hmap local_datapaths =
HMAP_INITIALIZER(&local_datapaths);
>> +        struct sset local_lports = SSET_INITIALIZER(&local_lports);
>> +        struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
>> +        const struct ovsrec_bridge *br_int = get_br_int(&ctx);
>> +        const char *chassis_id = get_chassis_id(ctx.ovs_idl);
>> +
>> +        struct chassis_index chassis_index;
>> +
>> +        chassis_index_init(&chassis_index, ctx.ovnsb_idl);
>> +
>> +        if (ctx.ovnsb_idl_txn) {
>> +            const struct sbrec_chassis *chassis = NULL;
>> +            if (chassis_id) {
>> +                chassis = get_chassis(ctx.ovnsb_idl, chassis_id);
>> +                bfd_calculate_active_tunnels(br_int, &active_tunnels);
>> +                binding_get(&ctx, br_int, chassis,
>> +                            &chassis_index, &active_tunnels,
&local_datapaths,
>> +                            &local_lports);
>> +            }
>> +
>> +            if (br_int && chassis) {
>> +                pinctrl_run(&ctx, br_int, chassis, &chassis_index,
>> +                            &local_datapaths, &active_tunnels);
>> +                update_sb_monitors(ctx.ovnsb_idl, chassis,
>> +                                   &local_lports, &local_datapaths);
>> +            }
>> +        }
>> +
>> +        chassis_index_destroy(&chassis_index);
>> +        sset_destroy(&local_lports);
>> +        sset_destroy(&active_tunnels);
>> +
>> +        struct local_datapath *cur_node, *next_node;
>> +        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node,
&local_datapaths) {
>> +            free(cur_node->peer_dps);
>> +            hmap_remove(&local_datapaths, &cur_node->hmap_node);
>> +            free(cur_node);
>> +        }
>> +        hmap_destroy(&local_datapaths);
>> +
>> +        if (br_int) {
>> +            pinctrl_wait(&ctx);
>> +        }
>> +        ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
>> +        ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
>> +
>> +        latch_wait(&thread->exit_latch);
>> +        poll_block();
>> +    }
>> +
>> +    pinctrl_destroy();
>> +
>> +    ovsdb_idl_loop_destroy(&ovs_idl_loop);
>> +    ovsdb_idl_loop_destroy(&ovnsb_idl_loop);
>> +
>> +    free(ovnsb_remote);
>> +
>> +    VLOG_INFO("pinctrl thread done");
>> +    return NULL;
>> +}
>> +
>>   void
>>   pinctrl_init(void)
>>   {
>> diff --git a/ovn/controller/pinctrl.h b/ovn/controller/pinctrl.h
>> index fc9cca8..15b12a0 100644
>> --- a/ovn/controller/pinctrl.h
>> +++ b/ovn/controller/pinctrl.h
>> @@ -34,6 +34,7 @@ void pinctrl_run(struct controller_ctx *,
>>                    const struct ovsrec_bridge *, const struct
sbrec_chassis *,
>>                    const struct chassis_index *, struct hmap
*local_datapaths,
>>                    struct sset *active_tunnels);
>> +void *pinctrl_thread_main(void *arg);
>>   void pinctrl_wait(struct controller_ctx *);
>>   void pinctrl_destroy(void);
>>
>
>
diff mbox series

Patch

diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c
index 414443f..cb04244 100644
--- a/ovn/controller/ovn-controller.c
+++ b/ovn/controller/ovn-controller.c
@@ -56,6 +56,8 @@ 
 #include "stream.h"
 #include "unixctl.h"
 #include "util.h"
+#include "latch.h"
+#include "ovs-thread.h"
 
 VLOG_DEFINE_THIS_MODULE(main);
 
@@ -66,8 +68,6 @@  static unixctl_cb_func inject_pkt;
 #define DEFAULT_BRIDGE_NAME "br-int"
 #define DEFAULT_PROBE_INTERVAL_MSEC 5000
 
-static void update_probe_interval(struct controller_ctx *,
-                                  const char *ovnsb_remote);
 static void parse_options(int argc, char *argv[]);
 OVS_NO_RETURN static void usage(void);
 
@@ -78,7 +78,7 @@  struct pending_pkt {
     char *flow_s;
 };
 
-static char *ovs_remote;
+char *ovs_remote;
 
 struct local_datapath *
 get_local_datapath(const struct hmap *local_datapaths, uint32_t tunnel_key)
@@ -129,7 +129,7 @@  get_bridge(struct ovsdb_idl *ovs_idl, const char *br_name)
     return NULL;
 }
 
-static void
+void
 update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
                    const struct sbrec_chassis *chassis,
                    const struct sset *local_ifaces,
@@ -257,7 +257,7 @@  create_br_int(struct controller_ctx *ctx)
     return bridge;
 }
 
-static const struct ovsrec_bridge *
+const struct ovsrec_bridge *
 get_br_int(struct controller_ctx *ctx)
 {
     const struct ovsrec_open_vswitch *cfg;
@@ -269,7 +269,7 @@  get_br_int(struct controller_ctx *ctx)
     return get_bridge(ctx->ovs_idl, br_int_name(cfg));
 }
 
-static const char *
+const char *
 get_chassis_id(const struct ovsdb_idl *ovs_idl)
 {
     const struct ovsrec_open_vswitch *cfg = ovsrec_open_vswitch_first(ovs_idl);
@@ -309,7 +309,7 @@  update_ssl_config(const struct ovsdb_idl *ovs_idl)
 
 /* Retrieves the OVN Southbound remote location from the
  * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of it. */
-static char *
+char *
 get_ovnsb_remote(struct ovsdb_idl *ovs_idl)
 {
     while (1) {
@@ -498,6 +498,22 @@  get_nb_cfg(struct ovsdb_idl *idl)
 }
 
 static void
+ctrl_thread_create(struct ctrl_thread *thread, const char *name,
+    void *(*start)(void *))
+{
+    latch_init(&thread->exit_latch);
+    thread->thread = ovs_thread_create(name, start, thread);
+}
+
+static void
+ctrl_thread_exit(struct ctrl_thread *thread)
+{
+    latch_set(&thread->exit_latch);
+    xpthread_join(thread->thread, NULL);
+    latch_destroy(&thread->exit_latch);
+}
+
+void
 ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl)
 {
     /* We do not monitor all tables by default, so modules must register
@@ -574,6 +590,22 @@  create_ovnsb_indexes(struct ovsdb_idl *ovnsb_idl)
                                OVSDB_INDEX_ASC, NULL);
 }
 
+void
+connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
+              struct ovnsb_cursors *cursors,
+              const char *ovnsb_remote)
+{
+    ovnsb_idl_loop->idl = ovsdb_idl_create(ovnsb_remote,
+            &sbrec_idl_class, true, true);
+
+    create_ovnsb_indexes(ovnsb_idl_loop->idl);
+    lport_init(cursors, ovnsb_idl_loop->idl);
+
+    ovsdb_idl_omit_alert(ovnsb_idl_loop->idl, &sbrec_chassis_col_nb_cfg);
+    update_sb_monitors(ovnsb_idl_loop->idl, NULL, NULL, NULL);
+    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop->idl);
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -605,7 +637,6 @@  main(int argc, char *argv[])
     daemonize_complete();
 
     ofctrl_init(&group_table);
-    pinctrl_init();
     lflow_init();
 
     /* Connect to OVS OVSDB instance. */
@@ -616,16 +647,9 @@  main(int argc, char *argv[])
 
     /* Connect to OVN SB database and get a snapshot. */
     char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
-    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
-        ovsdb_idl_create(ovnsb_remote, &sbrec_idl_class, true, true));
-
-    create_ovnsb_indexes(ovnsb_idl_loop.idl);
+    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(NULL);
     struct ovnsb_cursors ovnsb_cursors;
-    lport_init(ovnsb_cursors, ovnsb_idl_loop.idl);
-
-    ovsdb_idl_omit_alert(ovnsb_idl_loop.idl, &sbrec_chassis_col_nb_cfg);
-    update_sb_monitors(ovnsb_idl_loop.idl, NULL, NULL, NULL);
-    ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop.idl);
+    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
 
     /* Initialize connection tracking zones. */
     struct simap ct_zones = SIMAP_INITIALIZER(&ct_zones);
@@ -641,6 +665,10 @@  main(int argc, char *argv[])
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
+
+    struct ctrl_thread pinctrl_thread;
+    ctrl_thread_create(&pinctrl_thread, "pinctrl", pinctrl_thread_main);
+
     /* Main loop. */
     exiting = false;
     while (!exiting) {
@@ -705,8 +733,6 @@  main(int argc, char *argv[])
             enum mf_field_id mff_ovn_geneve = ofctrl_run(br_int,
                                                          &pending_ct_zones);
 
-            pinctrl_run(&ctx, br_int, chassis, &chassis_index,
-                        &local_datapaths, &active_tunnels);
             update_ct_zones(&local_lports, &local_datapaths, &ct_zones,
                             ct_zone_bitmap, &pending_ct_zones);
             if (ctx.ovs_idl_txn) {
@@ -791,7 +817,6 @@  main(int argc, char *argv[])
 
         if (br_int) {
             ofctrl_wait();
-            pinctrl_wait(&ctx);
         }
         ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
 
@@ -840,10 +865,12 @@  main(int argc, char *argv[])
         poll_block();
     }
 
+    /* stop child controller threads */
+    ctrl_thread_exit(&pinctrl_thread);
+
     unixctl_server_destroy(unixctl);
     lflow_destroy();
     ofctrl_destroy();
-    pinctrl_destroy();
 
     simap_destroy(&ct_zones);
 
@@ -1000,7 +1027,7 @@  inject_pkt(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
 /* Get the desired SB probe timer from the OVS database and configure it into
  * the SB database. */
-static void
+void
 update_probe_interval(struct controller_ctx *ctx, const char *ovnsb_remote)
 {
     const struct ovsrec_open_vswitch *cfg
diff --git a/ovn/controller/ovn-controller.h b/ovn/controller/ovn-controller.h
index f57c557..1234c2b 100644
--- a/ovn/controller/ovn-controller.h
+++ b/ovn/controller/ovn-controller.h
@@ -18,7 +18,9 @@ 
 #define OVN_CONTROLLER_H 1
 
 #include "simap.h"
+#include "sset.h"
 #include "ovn/lib/ovn-sb-idl.h"
+#include "latch.h"
 
 /* Linux supports a maximum of 64K zones, which seems like a fine default. */
 #define MAX_CT_ZONES 65535
@@ -71,6 +73,13 @@  struct local_datapath {
     size_t n_peer_dps;
 };
 
+struct ctrl_thread {
+    pthread_t thread;
+
+    /* Controls thread exit. */
+    struct latch exit_latch;
+};
+
 struct local_datapath *get_local_datapath(const struct hmap *,
                                           uint32_t tunnel_key);
 
@@ -90,5 +99,34 @@  enum chassis_tunnel_type {
 
 uint32_t get_tunnel_type(const char *name);
 
+/* Retrieves the OVN Southbound remote location from the
+ * "external-ids:ovn-remote" key in 'ovs_idl' and returns a copy of it. */
+char *get_ovnsb_remote(struct ovsdb_idl *ovs_idl);
+
+void
+update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
+                   const struct sbrec_chassis *chassis,
+                   const struct sset *local_ifaces,
+                   struct hmap *local_datapaths);
+
+/* Get the desired SB probe timer from the OVS database and configure it into
+ * the SB database. */
+void
+update_probe_interval(struct controller_ctx *ctx, const char *ovnsb_remote);
+
+const struct ovsrec_bridge *
+get_br_int(struct controller_ctx *ctx);
+
+const char *
+get_chassis_id(const struct ovsdb_idl *ovs_idl);
+
+void
+ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl);
+
+void
+connect_ovnsb(struct ovsdb_idl_loop *ovnsb_idl_loop,
+              struct ovnsb_cursors *cursors,
+              const char *ovnsb_remote);
 
+extern char *ovs_remote;
 #endif /* ovn/ovn-controller.h */
diff --git a/ovn/controller/pinctrl.c b/ovn/controller/pinctrl.c
index 9412b48..8a30be2 100644
--- a/ovn/controller/pinctrl.c
+++ b/ovn/controller/pinctrl.c
@@ -18,6 +18,7 @@ 
 
 #include "pinctrl.h"
 
+#include "bfd.h"
 #include "coverage.h"
 #include "csum.h"
 #include "dirs.h"
@@ -41,6 +42,7 @@ 
 #include "ovn/lex.h"
 #include "ovn/lib/acl-log.h"
 #include "ovn/lib/logical-fields.h"
+#include "ovn/lib/chassis-index.h"
 #include "ovn/lib/ovn-dhcp.h"
 #include "ovn/lib/ovn-util.h"
 #include "poll-loop.h"
@@ -48,6 +50,8 @@ 
 #include "socket-util.h"
 #include "timeval.h"
 #include "vswitch-idl.h"
+#include "latch.h"
+#include "binding.h"
 
 VLOG_DEFINE_THIS_MODULE(pinctrl);
 
@@ -84,6 +88,107 @@  static void reload_metadata(struct ofpbuf *ofpacts,
 
 COVERAGE_DEFINE(pinctrl_drop_put_mac_binding);
 
+void *
+pinctrl_thread_main(void *arg)
+{
+    struct ctrl_thread *thread = arg;
+    pinctrl_init();
+
+    /* Connect to OVS OVSDB instance. */
+    struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
+        ovsdb_idl_create(ovs_remote, &ovsrec_idl_class, false, true));
+    ctrl_register_ovs_idl(ovs_idl_loop.idl);
+    ovsdb_idl_get_initial_snapshot(ovs_idl_loop.idl);
+
+    /* Connect to OVN SB database and get a snapshot. */
+    char *ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
+    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(NULL);
+    struct ovnsb_cursors ovnsb_cursors;
+    connect_ovnsb(&ovnsb_idl_loop, &ovnsb_cursors, ovnsb_remote);
+
+    while (!latch_is_set(&thread->exit_latch)) {
+        /* Below logic is similar as in main loop in ovn-controller.c,
+         * while the purpose here is packet-in processing only */
+        char *new_ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
+        if (strcmp(ovnsb_remote, new_ovnsb_remote)) {
+            free(ovnsb_remote);
+            ovnsb_remote = new_ovnsb_remote;
+            ovsdb_idl_set_remote(ovnsb_idl_loop.idl, ovnsb_remote, true);
+        } else {
+            free(new_ovnsb_remote);
+        }
+
+        struct controller_ctx ctx = {
+            .ovs_idl = ovs_idl_loop.idl,
+            .ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop),
+            .ovnsb_idl = ovnsb_idl_loop.idl,
+            .ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
+            .ovnsb_cursors = &ovnsb_cursors,
+        };
+
+        update_probe_interval(&ctx, ovnsb_remote);
+
+        struct hmap local_datapaths = HMAP_INITIALIZER(&local_datapaths);
+        struct sset local_lports = SSET_INITIALIZER(&local_lports);
+        struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
+        const struct ovsrec_bridge *br_int = get_br_int(&ctx);
+        const char *chassis_id = get_chassis_id(ctx.ovs_idl);
+
+        struct chassis_index chassis_index;
+
+        chassis_index_init(&chassis_index, ctx.ovnsb_idl);
+
+        if (ctx.ovnsb_idl_txn) {
+            const struct sbrec_chassis *chassis = NULL;
+            if (chassis_id) {
+                chassis = get_chassis(ctx.ovnsb_idl, chassis_id);
+                bfd_calculate_active_tunnels(br_int, &active_tunnels);
+                binding_get(&ctx, br_int, chassis,
+                            &chassis_index, &active_tunnels, &local_datapaths,
+                            &local_lports);
+            }
+
+            if (br_int && chassis) {
+                pinctrl_run(&ctx, br_int, chassis, &chassis_index,
+                            &local_datapaths, &active_tunnels);
+                update_sb_monitors(ctx.ovnsb_idl, chassis,
+                                   &local_lports, &local_datapaths);
+            }
+        }
+
+        chassis_index_destroy(&chassis_index);
+        sset_destroy(&local_lports);
+        sset_destroy(&active_tunnels);
+
+        struct local_datapath *cur_node, *next_node;
+        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, &local_datapaths) {
+            free(cur_node->peer_dps);
+            hmap_remove(&local_datapaths, &cur_node->hmap_node);
+            free(cur_node);
+        }
+        hmap_destroy(&local_datapaths);
+
+        if (br_int) {
+            pinctrl_wait(&ctx);
+        }
+        ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
+        ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
+
+        latch_wait(&thread->exit_latch);
+        poll_block();
+    }
+
+    pinctrl_destroy();
+
+    ovsdb_idl_loop_destroy(&ovs_idl_loop);
+    ovsdb_idl_loop_destroy(&ovnsb_idl_loop);
+
+    free(ovnsb_remote);
+
+    VLOG_INFO("pinctrl thread done");
+    return NULL;
+}
+
 void
 pinctrl_init(void)
 {
diff --git a/ovn/controller/pinctrl.h b/ovn/controller/pinctrl.h
index fc9cca8..15b12a0 100644
--- a/ovn/controller/pinctrl.h
+++ b/ovn/controller/pinctrl.h
@@ -34,6 +34,7 @@  void pinctrl_run(struct controller_ctx *,
                  const struct ovsrec_bridge *, const struct sbrec_chassis *,
                  const struct chassis_index *, struct hmap *local_datapaths,
                  struct sset *active_tunnels);
+void *pinctrl_thread_main(void *arg);
 void pinctrl_wait(struct controller_ctx *);
 void pinctrl_destroy(void);