diff mbox series

[ovs-dev,v6,2/8] dpif-netdev: Register packet processing cores to KA framework.

Message ID 1512734667-23202-3-git-send-email-bhanuprakash.bodireddy@intel.com
State Changes Requested
Headers show
Series Add OVS DPDK keep-alive functionality. | expand

Commit Message

Bodireddy, Bhanuprakash Dec. 8, 2017, 12:04 p.m. UTC
This commit registers the packet processing PMD threads to keepalive
framework. Only PMDs that have rxqs mapped will be registered and
actively monitored by KA framework.

This commit spawns a keepalive thread that will dispatch heartbeats to
PMD threads. The pmd threads respond to heartbeats by marking themselves
alive. As long as PMD responds to heartbeats it is considered 'healthy'.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
---
 lib/dpif-netdev.c |  79 ++++++++++++++++++++++
 lib/keepalive.c   | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 lib/keepalive.h   |  20 ++++++
 lib/ovs-thread.c  |   6 ++
 lib/ovs-thread.h  |   1 +
 lib/util.c        |  22 +++++++
 lib/util.h        |   1 +
 7 files changed, 318 insertions(+), 5 deletions(-)

Comments

Fischetti, Antonio Dec. 15, 2017, 2:24 p.m. UTC | #1
LGTM,

Tested-by: Antonio Fischetti <antonio.fischetti@intel.com>
Acked-by: Antonio Fischetti <antonio.fischetti@intel.com>


> -----Original Message-----
> From: ovs-dev-bounces@openvswitch.org [mailto:ovs-dev-
> bounces@openvswitch.org] On Behalf Of Bhanuprakash Bodireddy
> Sent: Friday, December 8, 2017 12:04 PM
> To: dev@openvswitch.org
> Subject: [ovs-dev] [PATCH v6 2/8] dpif-netdev: Register packet
> processing cores to KA framework.
> 
> This commit registers the packet processing PMD threads to keepalive
> framework. Only PMDs that have rxqs mapped will be registered and
> actively monitored by KA framework.
> 
> This commit spawns a keepalive thread that will dispatch heartbeats to
> PMD threads. The pmd threads respond to heartbeats by marking themselves
> alive. As long as PMD responds to heartbeats it is considered 'healthy'.
> 
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
> ---
>  lib/dpif-netdev.c |  79 ++++++++++++++++++++++
>  lib/keepalive.c   | 194
> ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  lib/keepalive.h   |  20 ++++++
>  lib/ovs-thread.c  |   6 ++
>  lib/ovs-thread.h  |   1 +
>  lib/util.c        |  22 +++++++
>  lib/util.h        |   1 +
>  7 files changed, 318 insertions(+), 5 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 07f6113..c978a76 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -49,6 +49,7 @@
>  #include "flow.h"
>  #include "hmapx.h"
>  #include "id-pool.h"
> +#include "keepalive.h"
>  #include "latch.h"
>  #include "netdev.h"
>  #include "netdev-vport.h"
> @@ -592,6 +593,7 @@ struct dp_netdev_pmd_thread {
>      atomic_bool reload;             /* Do we need to reload ports? */
>      pthread_t thread;
>      unsigned core_id;               /* CPU core id of this pmd thread.
> */
> +    pid_t tid;                      /* PMD thread tid. */
>      int numa_id;                    /* numa node id of this pmd thread.
> */
>      bool isolated;
> 
> @@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp,
>      *n = k;
>  }
> 
> +static void *
> +ovs_keepalive(void *f_ OVS_UNUSED)
> +{
> +    pthread_detach(pthread_self());
> +
> +    for (;;) {
> +        uint64_t interval;
> +
> +        interval = get_ka_interval();
> +        xnanosleep(interval * 1000 * 1000);
> +    }
> +
> +    return NULL;
> +}
> +
> +/* Kickstart 'ovs_keepalive' thread. */
> +static void
> +ka_thread_start(struct dp_netdev *dp)
> +{
> +    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> +    if (ovsthread_once_start(&once)) {
> +        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
> +
> +        ovsthread_once_done(&once);
> +    }
> +}
> +
> +/* Register the datapath threads. This gets invoked on every datapath
> + * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
> + * registered to KA framework.
> + */
> +static void
> +ka_register_datapath_threads(struct dp_netdev *dp)
> +{
> +    if (!ka_is_enabled()) {
> +        return;
> +    }
> +
> +    ka_thread_start(dp);
> +
> +    ka_reload_datapath_threads_begin();
> +
> +    struct dp_netdev_pmd_thread *pmd;
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        /*  Register only PMD threads. */
> +        if (pmd->core_id != NON_PMD_CORE_ID) {
> +            /* Skip PMD thread with no rxqs mapping. */
> +            if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
> +                /* Rxq mapping changes due to datapath reconfiguration.
> +                 * If no rxqs mapped to PMD now due to reconfiguration,
> +                 * unregister the pmd thread. */
> +                ka_unregister_thread(pmd->tid);
> +                continue;
> +            }
> +
> +            ka_register_thread(pmd->tid);
> +            VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA
> framework",
> +                      pmd->tid, pmd->core_id);
> +        }
> +    }
> +    ka_cache_registered_threads();
> +
> +    ka_reload_datapath_threads_end();
> +}
> +
>  static void
>  dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
>                            const char *argv[], void *aux OVS_UNUSED)
> @@ -3819,6 +3887,9 @@ reconfigure_datapath(struct dp_netdev *dp)
> 
>      /* Reload affected pmd threads. */
>      reload_affected_pmds(dp);
> +
> +    /* Register datapath threads to KA monitoring. */
> +    ka_register_datapath_threads(dp);
>  }
> 
>  /* Returns true if one of the netdevs in 'dp' requires a
> reconfiguration */
> @@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_)
> 
>      /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>      ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> +    /* Stores tid in to 'pmd->tid'. */
> +    ovsthread_set_tid(&pmd->tid);
>      ovs_numa_thread_setaffinity_core(pmd->core_id);
>      dpdk_set_lcore_id(pmd->core_id);
>      poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
> @@ -4056,6 +4129,9 @@ reload:
>                                                        :
> PMD_CYCLES_IDLE);
>          }
> 
> +        /* Mark PMD thread alive. */
> +        ka_mark_pmd_thread_alive(pmd->tid);
> +
>          if (lc++ > 1024) {
>              bool reload;
> 
> @@ -4089,6 +4165,9 @@ reload:
>      }
> 
>      emc_cache_uninit(&pmd->flow_cache);
> +
> +    ka_unregister_thread(pmd->tid);
> +
>      free(poll_list);
>      pmd_free_cached_ports(pmd);
>      return NULL;
> diff --git a/lib/keepalive.c b/lib/keepalive.c
> index ca8dccb..b04877f 100644
> --- a/lib/keepalive.c
> +++ b/lib/keepalive.c
> @@ -19,6 +19,7 @@
>  #include "keepalive.h"
>  #include "lib/vswitch-idl.h"
>  #include "openvswitch/vlog.h"
> +#include "process.h"
>  #include "seq.h"
>  #include "timeval.h"
> 
> @@ -28,11 +29,19 @@ static bool keepalive_enable = false;      /*
> Keepalive disabled by default. */
>  static uint32_t keepalive_timer_interval;  /* keepalive timer interval.
> */
>  static struct keepalive_info ka_info;
> 
> -/* Returns true if keepalive is enabled, false otherwise. */
> -bool
> -ka_is_enabled(void)
> +/* Returns true if state update is allowed, false otherwise. */
> +static bool
> +ka_can_update_state(void)
>  {
> -    return keepalive_enable;
> +    bool reload_inprogress;
> +    bool ka_enable;
> +
> +    atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
> +    ka_enable = ka_is_enabled();
> +
> +    /* Return true if KA is enabled and 'cached_process_list' map
> reload
> +     * is completed. */
> +    return ka_enable && !reload_inprogress;
>  }
> 
>  /* Finds the thread by 'tid' in 'process_list' map and update
> @@ -49,7 +58,7 @@ ka_set_thread_state_ts(pid_t tid, enum keepalive_state
> state,
>      ovs_mutex_lock(&ka_info.proclist_mutex);
>      HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
>                               &ka_info.process_list) {
> -        if (pinfo->tid == tid) {
> +        if (OVS_LIKELY(pinfo->tid == tid)) {
>              pinfo->state = state;
>              pinfo->last_seen_time = last_alive;
>          }
> @@ -104,6 +113,177 @@ ka_register_relay_cb(ka_relay_cb cb, void *aux)
>      ka_info.relay_cb_data = aux;
>  }
> 
> +/* Returns true if keepalive is enabled, false otherwise. */
> +bool
> +ka_is_enabled(void)
> +{
> +    return keepalive_enable;
> +}
> +
> +/* Return the Keepalive timer interval. */
> +uint32_t
> +get_ka_interval(void)
> +{
> +    return keepalive_timer_interval;
> +}
> +
> +/* 'cached_process_list' map reload in progress.
> + *
> + * Should be called before the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * doesn't heartbeat while the reload is in progress. */
> +void
> +ka_reload_datapath_threads_begin(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, true);
> +}
> +
> +/* 'cached_process_list' map reload finished.
> + *
> + * Should be called after the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * can restart heartbeat when the reload is finished. */
> +void
> +ka_reload_datapath_threads_end(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, false);
> +}
> +
> +/* Register thread to KA framework. */
> +void
> +ka_register_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +        int core_id = -1;
> +        char proc_name[18] = "UNDEFINED";
> +
> +        struct process_info pinfo;
> +        int success = get_process_info(tid, &pinfo);
> +        if (success) {
> +            core_id = pinfo.core_id;
> +            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
> +        }
> +
> +        uint32_t hash = hash_int(tid, 0);
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
> +                                 hash, &ka_info.process_list) {
> +            /* Thread is already registered. */
> +            if (ka_pinfo->tid == tid) {
> +                goto out;
> +            }
> +        }
> +
> +        ka_pinfo = xmalloc(sizeof *ka_pinfo);
> +        ka_pinfo->tid = tid;
> +        ka_pinfo->core_id = core_id;
> +        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
> +
> +        hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
> +
> +        ka_pinfo->state = KA_STATE_ALIVE;
> +        ka_pinfo->last_seen_time = time_wall_msec();
> +        ka_info.thread_cnt++;  /* Increment count of registered
> threads. */
> +out:
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Unregister thread from KA framework. */
> +void
> +ka_unregister_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
> +                                 &ka_info.process_list) {
> +            /* If thread is registered, remove it from the list */
> +            if (ka_pinfo->tid == tid) {
> +                hmap_remove(&ka_info.process_list, &ka_pinfo->node);
> +                free(ka_pinfo);
> +
> +                ka_pinfo->state = KA_STATE_UNUSED;
> +                ka_info.thread_cnt--;  /* Decrement thread count. */
> +                break;
> +            }
> +        }
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Free the 'ka_info.cached_process_list' list. */
> +void
> +ka_free_cached_threads(void)
> +{
> +    struct ka_process_info *pinfo_cached;
> +    /* Free threads in the cached list. */
> +    HMAP_FOR_EACH_POP (pinfo_cached, node,
> &ka_info.cached_process_list) {
> +        free(pinfo_cached);
> +    }
> +    hmap_shrink(&ka_info.cached_process_list);
> +}
> +
> +/* Cache the list of registered threads from 'ka_info.process_list'
> + * map into 'ka_info.cached_process_list.
> + *
> + * 'cached_process_list' map is an exact copy of 'process_list' that
> will
> + * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
> + * mechanism.  This cached copy is created so that the heartbeats can
> be
> + * performed with out acquiring locks.
> + *
> + * On datapath reconfiguration, both the 'process_list' and the cached
> copy
> + * 'cached_process_list' is updated after setting 'reload_threads' to
> 'true'
> + * so that pmd doesn't heartbeat while the maps are updated.
> + *
> + */
> +void
> +ka_cache_registered_threads(void)
> +{
> +    struct ka_process_info *pinfo, *next, *pinfo_cached;
> +
> +    ka_free_cached_threads();
> +
> +    HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
> +        pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
> +        hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
> +                     hash_int(pinfo->tid,0));
> +    }
> +}
> +
> +/* Mark packet processing thread alive. */
> +void
> +ka_mark_pmd_thread_alive(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (OVS_LIKELY(pinfo->tid == tid)) {
> +                pinfo->state = KA_STATE_ALIVE;
> +            }
> +        }
> +    }
> +}
> +
> +/* Mark packet processing thread as sleeping. */
> +void
> +ka_mark_pmd_thread_sleep(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (pinfo->tid == tid) {
> +                pinfo->state = KA_STATE_SLEEP;
> +            }
> +        }
> +    }
> +}
> +
>  void
>  ka_init(const struct smap *ovs_other_config)
>  {
> @@ -120,6 +300,7 @@ ka_init(const struct smap *ovs_other_config)
>              ka_register_relay_cb(ka_update_thread_state, NULL);
>              ovs_mutex_init(&ka_info.proclist_mutex);
>              hmap_init(&ka_info.process_list);
> +            hmap_init(&ka_info.cached_process_list);
> 
>              ka_info.init_time = time_wall_msec();
> 
> @@ -143,5 +324,8 @@ ka_destroy(void)
>      ovs_mutex_unlock(&ka_info.proclist_mutex);
> 
>      hmap_destroy(&ka_info.process_list);
> +
> +    ka_free_cached_threads();
> +    hmap_destroy(&ka_info.cached_process_list);
>      ovs_mutex_destroy(&ka_info.proclist_mutex);
>  }
> diff --git a/lib/keepalive.h b/lib/keepalive.h
> index a738daa..7674ea3 100644
> --- a/lib/keepalive.h
> +++ b/lib/keepalive.h
> @@ -48,6 +48,9 @@ enum keepalive_state {
>  };
> 
>  struct ka_process_info {
> +    /* Process name. */
> +    char name[16];
> +
>      /* Thread id of the process, retrieved using ovs_gettid(). */
>      pid_t tid;
> 
> @@ -71,15 +74,32 @@ struct keepalive_info {
>      /* List of process/threads monitored by KA framework. */
>      struct hmap process_list OVS_GUARDED;
> 
> +    /* cached copy of 'process_list' list. */
> +    struct hmap cached_process_list;
> +
> +    /* count of threads registered to KA framework. */
> +    uint32_t thread_cnt;
> +
>      /* Keepalive initialization time. */
>      uint64_t init_time;
> 
>      /* keepalive relay handler. */
>      ka_relay_cb relay_cb;
>      void *relay_cb_data;
> +
> +    atomic_bool reload_threads;   /* Reload threads in to cached list.
> */
>  };
> 
>  bool ka_is_enabled(void);
> +uint32_t get_ka_interval(void);
> +void ka_reload_datapath_threads_begin(void);
> +void ka_reload_datapath_threads_end(void);
> +void ka_register_thread(pid_t);
> +void ka_unregister_thread(pid_t);
> +void ka_free_cached_threads(void);
> +void ka_cache_registered_threads(void);
> +void ka_mark_pmd_thread_alive(int);
> +void ka_mark_pmd_thread_sleep(int);
>  void ka_init(const struct smap *);
>  void ka_destroy(void);
> 
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index f8bc06d..ae8e450 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -597,6 +597,12 @@ thread_is_pmd(void)
>      return !strncmp(name, "pmd", 3);
>  }
> 
> +void
> +ovsthread_set_tid(pid_t *tid)
> +{
> +    *tid = ovs_get_tid();
> +}
> +
>  

>  /* ovsthread_key. */
> 
> diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> index 55e51a4..cfb4b04 100644
> --- a/lib/ovs-thread.h
> +++ b/lib/ovs-thread.h
> @@ -524,5 +524,6 @@ bool may_fork(void);
> 
>  int count_cpu_cores(void);
>  bool thread_is_pmd(void);
> +void ovsthread_set_tid(pid_t *);
> 
>  #endif /* ovs-thread.h */
> diff --git a/lib/util.c b/lib/util.c
> index 2965656..927929b 100644
> --- a/lib/util.c
> +++ b/lib/util.c
> @@ -26,6 +26,12 @@
>  #include <stdlib.h>
>  #include <string.h>
>  #include <sys/stat.h>
> +#ifdef __linux__
> +#include <sys/syscall.h>
> +#endif
> +#ifdef __FreeBSD__
> +#include <sys/thr.h>
> +#endif
>  #include <unistd.h>
>  #include "bitmap.h"
>  #include "byte-order.h"
> @@ -575,6 +581,22 @@ get_page_size(void)
>      return cached;
>  }
> 
> +/* Returns the tid of the calling thread if supported, -EINVAL
> otherwise. */
> +pid_t
> +ovs_get_tid(void)
> +{
> +#ifdef __linux__
> +    return syscall(SYS_gettid);
> +#elif defined(__FreeBSD__) || defined(__NetBSD__)
> +    long tid;
> +    thr_self(&tid);
> +    return (pid_t)tid;
> +#endif
> +
> +    VLOG_ERR("ovs_get_tid(): unsupported.");
> +    return -EINVAL;
> +}
> +
>  /* Returns the time at which the system booted, as the number of
> milliseconds
>   * since the epoch, or 0 if the time of boot cannot be determined. */
>  long long int
> diff --git a/lib/util.h b/lib/util.h
> index b01f421..259346d 100644
> --- a/lib/util.h
> +++ b/lib/util.h
> @@ -156,6 +156,7 @@ void free_cacheline(void *);
> 
>  void ovs_strlcpy(char *dst, const char *src, size_t size);
>  void ovs_strzcpy(char *dst, const char *src, size_t size);
> +pid_t ovs_get_tid(void);
> 
>  /* The C standards say that neither the 'dst' nor 'src' argument to
>   * memcpy() may be null, even if 'n' is zero.  This wrapper tolerates
> --
> 2.4.11
> 
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Kevin Traynor Jan. 22, 2018, 4:21 p.m. UTC | #2
On 12/08/2017 12:04 PM, Bhanuprakash Bodireddy wrote:
> This commit registers the packet processing PMD threads to keepalive
> framework. Only PMDs that have rxqs mapped will be registered and
> actively monitored by KA framework.
> 
> This commit spawns a keepalive thread that will dispatch heartbeats to
> PMD threads. The pmd threads respond to heartbeats by marking themselves
> alive. As long as PMD responds to heartbeats it is considered 'healthy'.
> 
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
> ---
>  lib/dpif-netdev.c |  79 ++++++++++++++++++++++
>  lib/keepalive.c   | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  lib/keepalive.h   |  20 ++++++
>  lib/ovs-thread.c  |   6 ++
>  lib/ovs-thread.h  |   1 +
>  lib/util.c        |  22 +++++++
>  lib/util.h        |   1 +
>  7 files changed, 318 insertions(+), 5 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 07f6113..c978a76 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -49,6 +49,7 @@
>  #include "flow.h"
>  #include "hmapx.h"
>  #include "id-pool.h"
> +#include "keepalive.h"
>  #include "latch.h"
>  #include "netdev.h"
>  #include "netdev-vport.h"
> @@ -592,6 +593,7 @@ struct dp_netdev_pmd_thread {
>      atomic_bool reload;             /* Do we need to reload ports? */
>      pthread_t thread;
>      unsigned core_id;               /* CPU core id of this pmd thread. */
> +    pid_t tid;                      /* PMD thread tid. */
>      int numa_id;                    /* numa node id of this pmd thread. */
>      bool isolated;
>  
> @@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp,
>      *n = k;
>  }
>  
> +static void *
> +ovs_keepalive(void *f_ OVS_UNUSED)
> +{
> +    pthread_detach(pthread_self());
> +
> +    for (;;) {
> +        uint64_t interval;

variable is not needed
> +
> +        interval = get_ka_interval();
> +        xnanosleep(interval * 1000 * 1000);
> +    }
> +
> +    return NULL;
> +}
> +
> +/* Kickstart 'ovs_keepalive' thread. */
> +static void
> +ka_thread_start(struct dp_netdev *dp)

Some ka_* functions are in this file, some in keepalive.c, it's a bit
confusing

> +{
> +    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> +    if (ovsthread_once_start(&once)) {
> +        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
> +

extra blank line

> +        ovsthread_once_done(&once);
> +    }
> +}
> +
> +/* Register the datapath threads. This gets invoked on every datapath
> + * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
> + * registered to KA framework.
> + */
> +static void
> +ka_register_datapath_threads(struct dp_netdev *dp)
> +{
> +    if (!ka_is_enabled()) {
> +        return;
> +    }
> +
> +    ka_thread_start(dp);
> +

extra blank line - take as a general comment

> +    ka_reload_datapath_threads_begin();
> +

why is this needed now when the next loop does not work on the
cached_process_list? Could it be moved down to before
ka_cache_registered_threads()

> +    struct dp_netdev_pmd_thread *pmd;
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        /*  Register only PMD threads. */
> +        if (pmd->core_id != NON_PMD_CORE_ID) {
> +            /* Skip PMD thread with no rxqs mapping. */
> +            if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {

remove the unlikely - you can't say if this is unlikely and the
optimization is not needed

Some PMDs will have rxq's now, but they will be removed at the start of
the next reconfiguration because something on that port is changing. So
for a time during reconfig, they will be registered but may not be
running. It might be ok considering reconfig should be quicker
than a heartbeat interval but you would need to check that case if you
have not already.

> +                /* Rxq mapping changes due to datapath reconfiguration.
> +                 * If no rxqs mapped to PMD now due to reconfiguration,
> +                 * unregister the pmd thread. */
> +                ka_unregister_thread(pmd->tid);

Why not allow pmd's without rxqs into sleep mode? Otherwise pmds will
appear and disappear.

> +                continue;
> +            }
> +
> +            ka_register_thread(pmd->tid);
> +            VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA framework",
> +                      pmd->tid, pmd->core_id);
> +        }
> +    }
> +    ka_cache_registered_threads();
> +
> +    ka_reload_datapath_threads_end();
> +}
> +
>  static void
>  dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
>                            const char *argv[], void *aux OVS_UNUSED)
> @@ -3819,6 +3887,9 @@ reconfigure_datapath(struct dp_netdev *dp)
>  
>      /* Reload affected pmd threads. */
>      reload_affected_pmds(dp);
> +
> +    /* Register datapath threads to KA monitoring. */
> +    ka_register_datapath_threads(dp);
>  }
>  
>  /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
> @@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_)
>  
>      /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>      ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> +    /* Stores tid in to 'pmd->tid'. */
> +    ovsthread_set_tid(&pmd->tid);
>      ovs_numa_thread_setaffinity_core(pmd->core_id);
>      dpdk_set_lcore_id(pmd->core_id);
>      poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
> @@ -4056,6 +4129,9 @@ reload:
>                                                        : PMD_CYCLES_IDLE);
>          }
>  
> +        /* Mark PMD thread alive. */
> +        ka_mark_pmd_thread_alive(pmd->tid);
> +
>          if (lc++ > 1024) {
>              bool reload;
>  
> @@ -4089,6 +4165,9 @@ reload:
>      }
>  
>      emc_cache_uninit(&pmd->flow_cache);
> +
> +    ka_unregister_thread(pmd->tid);
> +
>      free(poll_list);
>      pmd_free_cached_ports(pmd);
>      return NULL;
> diff --git a/lib/keepalive.c b/lib/keepalive.c
> index ca8dccb..b04877f 100644
> --- a/lib/keepalive.c
> +++ b/lib/keepalive.c
> @@ -19,6 +19,7 @@
>  #include "keepalive.h"
>  #include "lib/vswitch-idl.h"
>  #include "openvswitch/vlog.h"
> +#include "process.h"
>  #include "seq.h"
>  #include "timeval.h"
>  
> @@ -28,11 +29,19 @@ static bool keepalive_enable = false;      /* Keepalive disabled by default. */
>  static uint32_t keepalive_timer_interval;  /* keepalive timer interval. */
>  static struct keepalive_info ka_info;
>  
> -/* Returns true if keepalive is enabled, false otherwise. */
> -bool
> -ka_is_enabled(void)
> +/* Returns true if state update is allowed, false otherwise. */
> +static bool
> +ka_can_update_state(void)
>  {
> -    return keepalive_enable;
> +    bool reload_inprogress;
> +    bool ka_enable;
> +
> +    atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
> +    ka_enable = ka_is_enabled();
> +

no need for a ka_enable variable here

> +    /* Return true if KA is enabled and 'cached_process_list' map reload
> +     * is completed. */
> +    return ka_enable && !reload_inprogress;
>  }
>  
>  /* Finds the thread by 'tid' in 'process_list' map and update
> @@ -49,7 +58,7 @@ ka_set_thread_state_ts(pid_t tid, enum keepalive_state state,
>      ovs_mutex_lock(&ka_info.proclist_mutex);
>      HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
>                               &ka_info.process_list) {
> -        if (pinfo->tid == tid) {
> +        if (OVS_LIKELY(pinfo->tid == tid)) {

I don't see why this should be unlikely, or changed in this patch

>              pinfo->state = state;
>              pinfo->last_seen_time = last_alive;
>          }
> @@ -104,6 +113,177 @@ ka_register_relay_cb(ka_relay_cb cb, void *aux)
>      ka_info.relay_cb_data = aux;
>  }
>  
> +/* Returns true if keepalive is enabled, false otherwise. */
> +bool
> +ka_is_enabled(void)
> +{
> +    return keepalive_enable;
> +}
> +
> +/* Return the Keepalive timer interval. */
> +uint32_t
> +get_ka_interval(void)
> +{
> +    return keepalive_timer_interval;
> +}
> +
> +/* 'cached_process_list' map reload in progress.
> + *
> + * Should be called before the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * doesn't heartbeat while the reload is in progress. */
> +void
> +ka_reload_datapath_threads_begin(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, true);
> +}
> +
> +/* 'cached_process_list' map reload finished.
> + *
> + * Should be called after the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * can restart heartbeat when the reload is finished. */
> +void
> +ka_reload_datapath_threads_end(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, false);
> +}
> +
> +/* Register thread to KA framework. */
> +void
> +ka_register_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +        int core_id = -1;
> +        char proc_name[18] = "UNDEFINED";
> +
> +        struct process_info pinfo;
> +        int success = get_process_info(tid, &pinfo);
> +        if (success) {

you could remove variable here

> +            core_id = pinfo.core_id;
> +            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
> +        }
> +
> +        uint32_t hash = hash_int(tid, 0);
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
> +                                 hash, &ka_info.process_list) {
> +            /* Thread is already registered. */
> +            if (ka_pinfo->tid == tid) {
> +                goto out;
> +            }
> +        }
> +
> +        ka_pinfo = xmalloc(sizeof *ka_pinfo);
> +        ka_pinfo->tid = tid;
> +        ka_pinfo->core_id = core_id;
> +        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
> +
> +        hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
> +
> +        ka_pinfo->state = KA_STATE_ALIVE;
> +        ka_pinfo->last_seen_time = time_wall_msec();

Is it right to set as ALIVE here? this is just registering the threads.

> +        ka_info.thread_cnt++;  /* Increment count of registered threads. */
> +out:
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Unregister thread from KA framework. */
> +void
> +ka_unregister_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
> +                                 &ka_info.process_list) {
> +            /* If thread is registered, remove it from the list */
> +            if (ka_pinfo->tid == tid) {
> +                hmap_remove(&ka_info.process_list, &ka_pinfo->node);
> +                free(ka_pinfo);
> +
> +                ka_pinfo->state = KA_STATE_UNUSED;
> +                ka_info.thread_cnt--;  /* Decrement thread count. */
> +                break;
> +            }
> +        }
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Free the 'ka_info.cached_process_list' list. */
> +void
> +ka_free_cached_threads(void)
> +{
> +    struct ka_process_info *pinfo_cached;
> +    /* Free threads in the cached list. */
> +    HMAP_FOR_EACH_POP (pinfo_cached, node, &ka_info.cached_process_list) {
> +        free(pinfo_cached);
> +    }
> +    hmap_shrink(&ka_info.cached_process_list);
> +}
> +
> +/* Cache the list of registered threads from 'ka_info.process_list'
> + * map into 'ka_info.cached_process_list.
> + *
> + * 'cached_process_list' map is an exact copy of 'process_list' that will
> + * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
> + * mechanism.  This cached copy is created so that the heartbeats can be
> + * performed with out acquiring locks.
> + *
> + * On datapath reconfiguration, both the 'process_list' and the cached copy
> + * 'cached_process_list' is updated after setting 'reload_threads' to 'true'
> + * so that pmd doesn't heartbeat while the maps are updated.
> + *
> + */
> +void
> +ka_cache_registered_threads(void)
> +{
> +    struct ka_process_info *pinfo, *next, *pinfo_cached;
> +
> +    ka_free_cached_threads();
> +
> +    HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
> +        pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
> +        hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
> +                     hash_int(pinfo->tid,0));
> +    }
> +}
> +
> +/* Mark packet processing thread alive. */
> +void
> +ka_mark_pmd_thread_alive(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (OVS_LIKELY(pinfo->tid == tid)) {
> +                pinfo->state = KA_STATE_ALIVE;
> +            }
> +        }
> +    }
> +}
> +
> +/* Mark packet processing thread as sleeping. */
> +void
> +ka_mark_pmd_thread_sleep(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (pinfo->tid == tid) {
> +                pinfo->state = KA_STATE_SLEEP;
> +            }
> +        }
> +    }
> +}

this isn't used in the patch series atm, but maybe it should be used for
pmds which don't have rxqs. It could also be used if OVS used an
interrupt mode or other reason to sleep a pmd.

> +
>  void
>  ka_init(const struct smap *ovs_other_config)
>  {
> @@ -120,6 +300,7 @@ ka_init(const struct smap *ovs_other_config)
>              ka_register_relay_cb(ka_update_thread_state, NULL);
>              ovs_mutex_init(&ka_info.proclist_mutex);
>              hmap_init(&ka_info.process_list);
> +            hmap_init(&ka_info.cached_process_list);
>  
>              ka_info.init_time = time_wall_msec();
>  
> @@ -143,5 +324,8 @@ ka_destroy(void)
>      ovs_mutex_unlock(&ka_info.proclist_mutex);
>  
>      hmap_destroy(&ka_info.process_list);
> +
> +    ka_free_cached_threads();
> +    hmap_destroy(&ka_info.cached_process_list);
>      ovs_mutex_destroy(&ka_info.proclist_mutex);
>  }
> diff --git a/lib/keepalive.h b/lib/keepalive.h
> index a738daa..7674ea3 100644
> --- a/lib/keepalive.h
> +++ b/lib/keepalive.h
> @@ -48,6 +48,9 @@ enum keepalive_state {
>  };
>  
>  struct ka_process_info {
> +    /* Process name. */
> +    char name[16];
> +
>      /* Thread id of the process, retrieved using ovs_gettid(). */
>      pid_t tid;
>  
> @@ -71,15 +74,32 @@ struct keepalive_info {
>      /* List of process/threads monitored by KA framework. */
>      struct hmap process_list OVS_GUARDED;
>  
> +    /* cached copy of 'process_list' list. */
> +    struct hmap cached_process_list;
> +
> +    /* count of threads registered to KA framework. */
> +    uint32_t thread_cnt;
> +
>      /* Keepalive initialization time. */
>      uint64_t init_time;
>  
>      /* keepalive relay handler. */
>      ka_relay_cb relay_cb;
>      void *relay_cb_data;
> +
> +    atomic_bool reload_threads;   /* Reload threads in to cached list. */
>  };
>  
>  bool ka_is_enabled(void);
> +uint32_t get_ka_interval(void);
> +void ka_reload_datapath_threads_begin(void);
> +void ka_reload_datapath_threads_end(void);
> +void ka_register_thread(pid_t);
> +void ka_unregister_thread(pid_t);
> +void ka_free_cached_threads(void);
> +void ka_cache_registered_threads(void);
> +void ka_mark_pmd_thread_alive(int);
> +void ka_mark_pmd_thread_sleep(int);
>  void ka_init(const struct smap *);
>  void ka_destroy(void);
>  
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index f8bc06d..ae8e450 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -597,6 +597,12 @@ thread_is_pmd(void)
>      return !strncmp(name, "pmd", 3);
>  }
>  
> +void
> +ovsthread_set_tid(pid_t *tid)
> +{
> +    *tid = ovs_get_tid();
> +}
> +
>  
>  /* ovsthread_key. */
>  
> diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> index 55e51a4..cfb4b04 100644
> --- a/lib/ovs-thread.h
> +++ b/lib/ovs-thread.h
> @@ -524,5 +524,6 @@ bool may_fork(void);
>  
>  int count_cpu_cores(void);
>  bool thread_is_pmd(void);
> +void ovsthread_set_tid(pid_t *);
>  
>  #endif /* ovs-thread.h */
> diff --git a/lib/util.c b/lib/util.c
> index 2965656..927929b 100644
> --- a/lib/util.c
> +++ b/lib/util.c
> @@ -26,6 +26,12 @@
>  #include <stdlib.h>
>  #include <string.h>
>  #include <sys/stat.h>
> +#ifdef __linux__
> +#include <sys/syscall.h>
> +#endif
> +#ifdef __FreeBSD__
> +#include <sys/thr.h>
> +#endif
>  #include <unistd.h>
>  #include "bitmap.h"
>  #include "byte-order.h"
> @@ -575,6 +581,22 @@ get_page_size(void)
>      return cached;
>  }
>  
> +/* Returns the tid of the calling thread if supported, -EINVAL otherwise. */
> +pid_t
> +ovs_get_tid(void)
> +{
> +#ifdef __linux__
> +    return syscall(SYS_gettid);
> +#elif defined(__FreeBSD__) || defined(__NetBSD__)
> +    long tid;
> +    thr_self(&tid);
> +    return (pid_t)tid;
> +#endif
> +
> +    VLOG_ERR("ovs_get_tid(): unsupported.");
> +    return -EINVAL;
> +}
> +
>  /* Returns the time at which the system booted, as the number of milliseconds
>   * since the epoch, or 0 if the time of boot cannot be determined. */
>  long long int
> diff --git a/lib/util.h b/lib/util.h
> index b01f421..259346d 100644
> --- a/lib/util.h
> +++ b/lib/util.h
> @@ -156,6 +156,7 @@ void free_cacheline(void *);
>  
>  void ovs_strlcpy(char *dst, const char *src, size_t size);
>  void ovs_strzcpy(char *dst, const char *src, size_t size);
> +pid_t ovs_get_tid(void);
>  
>  /* The C standards say that neither the 'dst' nor 'src' argument to
>   * memcpy() may be null, even if 'n' is zero.  This wrapper tolerates
>
diff mbox series

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 07f6113..c978a76 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -49,6 +49,7 @@ 
 #include "flow.h"
 #include "hmapx.h"
 #include "id-pool.h"
+#include "keepalive.h"
 #include "latch.h"
 #include "netdev.h"
 #include "netdev-vport.h"
@@ -592,6 +593,7 @@  struct dp_netdev_pmd_thread {
     atomic_bool reload;             /* Do we need to reload ports? */
     pthread_t thread;
     unsigned core_id;               /* CPU core id of this pmd thread. */
+    pid_t tid;                      /* PMD thread tid. */
     int numa_id;                    /* numa node id of this pmd thread. */
     bool isolated;
 
@@ -1018,6 +1020,72 @@  sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void *
+ovs_keepalive(void *f_ OVS_UNUSED)
+{
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        uint64_t interval;
+
+        interval = get_ka_interval();
+        xnanosleep(interval * 1000 * 1000);
+    }
+
+    return NULL;
+}
+
+/* Kickstart 'ovs_keepalive' thread. */
+static void
+ka_thread_start(struct dp_netdev *dp)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+    if (ovsthread_once_start(&once)) {
+        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
+
+        ovsthread_once_done(&once);
+    }
+}
+
+/* Register the datapath threads. This gets invoked on every datapath
+ * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
+ * registered to KA framework.
+ */
+static void
+ka_register_datapath_threads(struct dp_netdev *dp)
+{
+    if (!ka_is_enabled()) {
+        return;
+    }
+
+    ka_thread_start(dp);
+
+    ka_reload_datapath_threads_begin();
+
+    struct dp_netdev_pmd_thread *pmd;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /*  Register only PMD threads. */
+        if (pmd->core_id != NON_PMD_CORE_ID) {
+            /* Skip PMD thread with no rxqs mapping. */
+            if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
+                /* Rxq mapping changes due to datapath reconfiguration.
+                 * If no rxqs mapped to PMD now due to reconfiguration,
+                 * unregister the pmd thread. */
+                ka_unregister_thread(pmd->tid);
+                continue;
+            }
+
+            ka_register_thread(pmd->tid);
+            VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA framework",
+                      pmd->tid, pmd->core_id);
+        }
+    }
+    ka_cache_registered_threads();
+
+    ka_reload_datapath_threads_end();
+}
+
 static void
 dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
                           const char *argv[], void *aux OVS_UNUSED)
@@ -3819,6 +3887,9 @@  reconfigure_datapath(struct dp_netdev *dp)
 
     /* Reload affected pmd threads. */
     reload_affected_pmds(dp);
+
+    /* Register datapath threads to KA monitoring. */
+    ka_register_datapath_threads(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -4023,6 +4094,8 @@  pmd_thread_main(void *f_)
 
     /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
     ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
+    /* Stores tid in to 'pmd->tid'. */
+    ovsthread_set_tid(&pmd->tid);
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
@@ -4056,6 +4129,9 @@  reload:
                                                       : PMD_CYCLES_IDLE);
         }
 
+        /* Mark PMD thread alive. */
+        ka_mark_pmd_thread_alive(pmd->tid);
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4089,6 +4165,9 @@  reload:
     }
 
     emc_cache_uninit(&pmd->flow_cache);
+
+    ka_unregister_thread(pmd->tid);
+
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
diff --git a/lib/keepalive.c b/lib/keepalive.c
index ca8dccb..b04877f 100644
--- a/lib/keepalive.c
+++ b/lib/keepalive.c
@@ -19,6 +19,7 @@ 
 #include "keepalive.h"
 #include "lib/vswitch-idl.h"
 #include "openvswitch/vlog.h"
+#include "process.h"
 #include "seq.h"
 #include "timeval.h"
 
@@ -28,11 +29,19 @@  static bool keepalive_enable = false;      /* Keepalive disabled by default. */
 static uint32_t keepalive_timer_interval;  /* keepalive timer interval. */
 static struct keepalive_info ka_info;
 
-/* Returns true if keepalive is enabled, false otherwise. */
-bool
-ka_is_enabled(void)
+/* Returns true if state update is allowed, false otherwise. */
+static bool
+ka_can_update_state(void)
 {
-    return keepalive_enable;
+    bool reload_inprogress;
+    bool ka_enable;
+
+    atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
+    ka_enable = ka_is_enabled();
+
+    /* Return true if KA is enabled and 'cached_process_list' map reload
+     * is completed. */
+    return ka_enable && !reload_inprogress;
 }
 
 /* Finds the thread by 'tid' in 'process_list' map and update
@@ -49,7 +58,7 @@  ka_set_thread_state_ts(pid_t tid, enum keepalive_state state,
     ovs_mutex_lock(&ka_info.proclist_mutex);
     HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
                              &ka_info.process_list) {
-        if (pinfo->tid == tid) {
+        if (OVS_LIKELY(pinfo->tid == tid)) {
             pinfo->state = state;
             pinfo->last_seen_time = last_alive;
         }
@@ -104,6 +113,177 @@  ka_register_relay_cb(ka_relay_cb cb, void *aux)
     ka_info.relay_cb_data = aux;
 }
 
+/* Returns true if keepalive is enabled, false otherwise. */
+bool
+ka_is_enabled(void)
+{
+    return keepalive_enable;
+}
+
+/* Return the Keepalive timer interval. */
+uint32_t
+get_ka_interval(void)
+{
+    return keepalive_timer_interval;
+}
+
+/* 'cached_process_list' map reload in progress.
+ *
+ * Should be called before the 'ka_info.cached_process_list'
+ * is populated from 'ka_info.process_list'. This way the pmd
+ * doesn't heartbeat while the reload is in progress. */
+void
+ka_reload_datapath_threads_begin(void)
+{
+    atomic_store_relaxed(&ka_info.reload_threads, true);
+}
+
+/* 'cached_process_list' map reload finished.
+ *
+ * Should be called after the 'ka_info.cached_process_list'
+ * is populated from 'ka_info.process_list'. This way the pmd
+ * can restart heartbeat when the reload is finished. */
+void
+ka_reload_datapath_threads_end(void)
+{
+    atomic_store_relaxed(&ka_info.reload_threads, false);
+}
+
+/* Register thread to KA framework. */
+void
+ka_register_thread(pid_t tid)
+{
+    if (ka_is_enabled()) {
+        struct ka_process_info *ka_pinfo;
+        int core_id = -1;
+        char proc_name[18] = "UNDEFINED";
+
+        struct process_info pinfo;
+        int success = get_process_info(tid, &pinfo);
+        if (success) {
+            core_id = pinfo.core_id;
+            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
+        }
+
+        uint32_t hash = hash_int(tid, 0);
+        ovs_mutex_lock(&ka_info.proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
+                                 hash, &ka_info.process_list) {
+            /* Thread is already registered. */
+            if (ka_pinfo->tid == tid) {
+                goto out;
+            }
+        }
+
+        ka_pinfo = xmalloc(sizeof *ka_pinfo);
+        ka_pinfo->tid = tid;
+        ka_pinfo->core_id = core_id;
+        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
+
+        hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
+
+        ka_pinfo->state = KA_STATE_ALIVE;
+        ka_pinfo->last_seen_time = time_wall_msec();
+        ka_info.thread_cnt++;  /* Increment count of registered threads. */
+out:
+        ovs_mutex_unlock(&ka_info.proclist_mutex);
+    }
+}
+
+/* Unregister thread from KA framework. */
+void
+ka_unregister_thread(pid_t tid)
+{
+    if (ka_is_enabled()) {
+        struct ka_process_info *ka_pinfo;
+
+        ovs_mutex_lock(&ka_info.proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
+                                 &ka_info.process_list) {
+            /* If thread is registered, remove it from the list */
+            if (ka_pinfo->tid == tid) {
+                hmap_remove(&ka_info.process_list, &ka_pinfo->node);
+                free(ka_pinfo);
+
+                ka_pinfo->state = KA_STATE_UNUSED;
+                ka_info.thread_cnt--;  /* Decrement thread count. */
+                break;
+            }
+        }
+        ovs_mutex_unlock(&ka_info.proclist_mutex);
+    }
+}
+
+/* Free the 'ka_info.cached_process_list' list. */
+void
+ka_free_cached_threads(void)
+{
+    struct ka_process_info *pinfo_cached;
+    /* Free threads in the cached list. */
+    HMAP_FOR_EACH_POP (pinfo_cached, node, &ka_info.cached_process_list) {
+        free(pinfo_cached);
+    }
+    hmap_shrink(&ka_info.cached_process_list);
+}
+
+/* Cache the list of registered threads from 'ka_info.process_list'
+ * map into 'ka_info.cached_process_list.
+ *
+ * 'cached_process_list' map is an exact copy of 'process_list' that will
+ * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
+ * mechanism.  This cached copy is created so that the heartbeats can be
+ * performed with out acquiring locks.
+ *
+ * On datapath reconfiguration, both the 'process_list' and the cached copy
+ * 'cached_process_list' is updated after setting 'reload_threads' to 'true'
+ * so that pmd doesn't heartbeat while the maps are updated.
+ *
+ */
+void
+ka_cache_registered_threads(void)
+{
+    struct ka_process_info *pinfo, *next, *pinfo_cached;
+
+    ka_free_cached_threads();
+
+    HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
+        pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
+        hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
+                     hash_int(pinfo->tid,0));
+    }
+}
+
+/* Mark packet processing thread alive. */
+void
+ka_mark_pmd_thread_alive(int tid)
+{
+    if (ka_can_update_state()) {
+        struct ka_process_info *pinfo;
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
+                             &ka_info.cached_process_list) {
+            if (OVS_LIKELY(pinfo->tid == tid)) {
+                pinfo->state = KA_STATE_ALIVE;
+            }
+        }
+    }
+}
+
+/* Mark packet processing thread as sleeping. */
+void
+ka_mark_pmd_thread_sleep(int tid)
+{
+    if (ka_can_update_state()) {
+        struct ka_process_info *pinfo;
+
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
+                             &ka_info.cached_process_list) {
+            if (pinfo->tid == tid) {
+                pinfo->state = KA_STATE_SLEEP;
+            }
+        }
+    }
+}
+
 void
 ka_init(const struct smap *ovs_other_config)
 {
@@ -120,6 +300,7 @@  ka_init(const struct smap *ovs_other_config)
             ka_register_relay_cb(ka_update_thread_state, NULL);
             ovs_mutex_init(&ka_info.proclist_mutex);
             hmap_init(&ka_info.process_list);
+            hmap_init(&ka_info.cached_process_list);
 
             ka_info.init_time = time_wall_msec();
 
@@ -143,5 +324,8 @@  ka_destroy(void)
     ovs_mutex_unlock(&ka_info.proclist_mutex);
 
     hmap_destroy(&ka_info.process_list);
+
+    ka_free_cached_threads();
+    hmap_destroy(&ka_info.cached_process_list);
     ovs_mutex_destroy(&ka_info.proclist_mutex);
 }
diff --git a/lib/keepalive.h b/lib/keepalive.h
index a738daa..7674ea3 100644
--- a/lib/keepalive.h
+++ b/lib/keepalive.h
@@ -48,6 +48,9 @@  enum keepalive_state {
 };
 
 struct ka_process_info {
+    /* Process name. */
+    char name[16];
+
     /* Thread id of the process, retrieved using ovs_gettid(). */
     pid_t tid;
 
@@ -71,15 +74,32 @@  struct keepalive_info {
     /* List of process/threads monitored by KA framework. */
     struct hmap process_list OVS_GUARDED;
 
+    /* cached copy of 'process_list' list. */
+    struct hmap cached_process_list;
+
+    /* count of threads registered to KA framework. */
+    uint32_t thread_cnt;
+
     /* Keepalive initialization time. */
     uint64_t init_time;
 
     /* keepalive relay handler. */
     ka_relay_cb relay_cb;
     void *relay_cb_data;
+
+    atomic_bool reload_threads;   /* Reload threads in to cached list. */
 };
 
 bool ka_is_enabled(void);
+uint32_t get_ka_interval(void);
+void ka_reload_datapath_threads_begin(void);
+void ka_reload_datapath_threads_end(void);
+void ka_register_thread(pid_t);
+void ka_unregister_thread(pid_t);
+void ka_free_cached_threads(void);
+void ka_cache_registered_threads(void);
+void ka_mark_pmd_thread_alive(int);
+void ka_mark_pmd_thread_sleep(int);
 void ka_init(const struct smap *);
 void ka_destroy(void);
 
diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
index f8bc06d..ae8e450 100644
--- a/lib/ovs-thread.c
+++ b/lib/ovs-thread.c
@@ -597,6 +597,12 @@  thread_is_pmd(void)
     return !strncmp(name, "pmd", 3);
 }
 
+void
+ovsthread_set_tid(pid_t *tid)
+{
+    *tid = ovs_get_tid();
+}
+
 
 /* ovsthread_key. */
 
diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
index 55e51a4..cfb4b04 100644
--- a/lib/ovs-thread.h
+++ b/lib/ovs-thread.h
@@ -524,5 +524,6 @@  bool may_fork(void);
 
 int count_cpu_cores(void);
 bool thread_is_pmd(void);
+void ovsthread_set_tid(pid_t *);
 
 #endif /* ovs-thread.h */
diff --git a/lib/util.c b/lib/util.c
index 2965656..927929b 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -26,6 +26,12 @@ 
 #include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
+#ifdef __linux__
+#include <sys/syscall.h>
+#endif
+#ifdef __FreeBSD__
+#include <sys/thr.h>
+#endif
 #include <unistd.h>
 #include "bitmap.h"
 #include "byte-order.h"
@@ -575,6 +581,22 @@  get_page_size(void)
     return cached;
 }
 
+/* Returns the tid of the calling thread if supported, -EINVAL otherwise. */
+pid_t
+ovs_get_tid(void)
+{
+#ifdef __linux__
+    return syscall(SYS_gettid);
+#elif defined(__FreeBSD__) || defined(__NetBSD__)
+    long tid;
+    thr_self(&tid);
+    return (pid_t)tid;
+#endif
+
+    VLOG_ERR("ovs_get_tid(): unsupported.");
+    return -EINVAL;
+}
+
 /* Returns the time at which the system booted, as the number of milliseconds
  * since the epoch, or 0 if the time of boot cannot be determined. */
 long long int
diff --git a/lib/util.h b/lib/util.h
index b01f421..259346d 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -156,6 +156,7 @@  void free_cacheline(void *);
 
 void ovs_strlcpy(char *dst, const char *src, size_t size);
 void ovs_strzcpy(char *dst, const char *src, size_t size);
+pid_t ovs_get_tid(void);
 
 /* The C standards say that neither the 'dst' nor 'src' argument to
  * memcpy() may be null, even if 'n' is zero.  This wrapper tolerates