diff mbox

[ovs-dev,RFC,v3,06/18] dpif-netdev: Register packet processing cores to KA framework.

Message ID 1497813871-27572-7-git-send-email-bhanuprakash.bodireddy@intel.com
State RFC
Headers show

Commit Message

Bodireddy, Bhanuprakash June 18, 2017, 7:24 p.m. UTC
This commit registers the packet processing PMD cores to keepalive
framework. Only PMDs that have rxqs mapped will be registered and
actively monitored by KA framework.

This commit spawns a keepalive thread that will dispatch heartbeats to
PMD cores. The pmd threads respond to heartbeats by marking themselves
alive. As long as PMD responds to heartbeats it is considered 'healthy'.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
---
 lib/dpif-netdev.c | 100 +++++++++++++++++++++++++++++++++++++++++
 lib/keepalive.c   | 130 +++++++++++++++++++++++++++++++++++++++++++++++-------
 lib/keepalive.h   |  25 ++++++++++-
 3 files changed, 236 insertions(+), 19 deletions(-)
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index ce141e8..4b7c835 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -72,6 +72,7 @@ 
 #include "seq.h"
 #include "smap.h"
 #include "sset.h"
+#include "svec.h"
 #include "timeval.h"
 #include "tnl-neigh-cache.h"
 #include "tnl-ports.h"
@@ -970,6 +971,96 @@  sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void *
+ovs_keepalive(void *f_ OVS_UNUSED)
+{
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        ovsrcu_quiesce_start();
+        usleep(get_ka_interval() * 1000);
+        ovsrcu_quiesce_end();
+    }
+
+    return NULL;
+}
+
+static void
+ka_thread_start(struct dp_netdev *dp)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+    if (ovsthread_once_start(&once)) {
+        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
+
+        ovsthread_once_done(&once);
+    }
+}
+
+static void
+pmd_num_poll_ports(struct dp_netdev_pmd_thread *pmd, int *num_poll_ports)
+{
+    struct svec pmd_port_poll_list;
+    svec_init(&pmd_port_poll_list);
+
+    struct rxq_poll *poll;
+    const char *port_name;
+    int i = 0;
+
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        svec_add(&pmd_port_poll_list, netdev_rxq_get_name(poll->rxq->rx));
+    }
+    /* With MQ enabled, remove the duplicates. */
+    svec_sort_unique(&pmd_port_poll_list);
+    SVEC_FOR_EACH (i, port_name, &pmd_port_poll_list) {
+        VLOG_DBG("%d Port:%s", i, port_name);
+    }
+    svec_destroy(&pmd_port_poll_list);
+
+    *num_poll_ports = i;
+    VLOG_DBG("PMD thread [%d] polling [%d] ports",
+                 pmd->core_id, *num_poll_ports);
+}
+
+static void
+ka_register_datapath_threads(struct dp_netdev *dp)
+{
+    int ka_init = get_ka_init_status();
+    VLOG_DBG("Keepalive: Was initialization successful? [%s]",
+                ka_init ? "Success" : "Failure");
+    if (!ka_init) {
+        return;
+    }
+
+    ka_thread_start(dp);
+
+    struct dp_netdev_pmd_thread *pmd;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /* Skip PMD thread with no rxqs mapping. */
+        if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
+            continue;
+        }
+
+        /*  Register only PMD threads. */
+        if (pmd->core_id != NON_PMD_CORE_ID) {
+            int err;
+            int nports;
+            pmd_num_poll_ports(pmd, &nports);
+            err = ka_alloc_portstats(pmd->core_id, nports);
+            if (err) {
+                VLOG_FATAL("Unable to allocate memory for PMD core %d",
+                            pmd->core_id);
+                return;
+            }
+
+            int tid = ka_get_pmd_tid(pmd->core_id);
+            ka_register_thread(tid, true);
+            VLOG_DBG("Registered PMD thread [%d] on Core [%d] to KA framework",
+                      tid, pmd->core_id);
+        }
+    }
+}
+
 static void
 dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
@@ -3541,6 +3632,9 @@  reconfigure_datapath(struct dp_netdev *dp)
 
     /* Reload affected pmd threads. */
     reload_affected_pmds(dp);
+
+    /* Register datapath threads to KA monitoring. */
+    ka_register_datapath_threads(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -3740,6 +3834,9 @@  reload:
                                        poll_list[i].port_no);
         }
 
+        /* Mark PMD thread alive. */
+        ka_mark_pmd_thread_alive();
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -3770,6 +3867,9 @@  reload:
         goto reload;
     }
 
+    int tid = ka_get_pmd_tid(pmd->core_id);
+    ka_unregister_thread(tid, true);
+
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
diff --git a/lib/keepalive.c b/lib/keepalive.c
index 54faf49..64ab117 100644
--- a/lib/keepalive.c
+++ b/lib/keepalive.c
@@ -25,6 +25,7 @@ 
 #include "keepalive.h"
 #include "lib/vswitch-idl.h"
 #include "openvswitch/vlog.h"
+#include "process.h"
 
 VLOG_DEFINE_THIS_MODULE(keepalive);
 
@@ -76,21 +77,77 @@  ka_store_pmd_id(unsigned core_idx)
     }
 }
 
-/* Register packet processing PMD thread to KA framework. */
+/* Register thread to KA framework. */
 void
-ka_register_pmd_thread(int tid OVS_UNUSED, unsigned core_id)
+ka_register_thread(int tid, bool thread_is_pmd)
 {
     if (ka_is_enabled()) {
-        dpdk_register_pmd_core(core_id);
+        struct ka_process_info *pinfo;
+        uint32_t core_id;
+
+        uint32_t hash = hash_int(tid, 0);
+        ovs_mutex_lock(&ka_info->proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash, &ka_info->process_list) {
+            /* PMD thread is already registered. */
+            if (pinfo->tid == tid) {
+                goto out;
+            }
+        }
+
+        pinfo = xmalloc(sizeof *pinfo);
+        core_id = get_cpu_num(tid);
+        pinfo->tid = tid;
+        pinfo->heartbeats = true;
+        pinfo->core_id = core_id;
+
+        char *pname = get_process_name(tid);
+        if (pname) {
+            ovs_strlcpy(pinfo->name, pname, sizeof pinfo->name);
+            free(pname);
+        } else {
+            ovs_strlcpy(pinfo->name, "UNDEFINED", sizeof pinfo->name);
+        }
+
+        hmap_insert(&ka_info->process_list, &pinfo->node, hash);
+
+        if (thread_is_pmd) {
+            dpdk_register_pmd_core(core_id);
+            ka_info->pmd_cnt++;  /* Increment PMD count. */
+        } else {
+            ka_info->nonpmd_cnt++;  /* Increment non-pmd thread count. */
+        }
+out:
+        ovs_mutex_unlock(&ka_info->proclist_mutex);
     }
 }
 
-/* Unregister packet processing PMD thread from KA framework. */
+/* Unregister thread from KA framework. */
 void
-ka_unregister_pmd_thread(int tid OVS_UNUSED, unsigned core_id)
+ka_unregister_thread(int tid, bool thread_is_pmd)
 {
     if (ka_is_enabled()) {
-        dpdk_unregister_pmd_core(core_id);
+        struct ka_process_info *pinfo;
+        ovs_mutex_lock(&ka_info->proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
+                                 &ka_info->process_list) {
+            /* If PMD thread is registered, remove it from the list */
+            if (pinfo->tid == tid) {
+                hmap_remove(&ka_info->process_list, &pinfo->node);
+                free(pinfo);
+
+                if (thread_is_pmd) {
+                    uint32_t core_id = get_cpu_num(tid);
+                    dpdk_unregister_pmd_core(core_id);
+                    ka_info->pmd_cnt--;  /* Decrement PMD count. */
+                } else {
+                    ka_info->nonpmd_cnt--;  /* Decrement non-pmd thread cnt. */
+                }
+
+                break;
+            }
+        }
+
+        ovs_mutex_unlock(&ka_info->proclist_mutex);
     }
 }
 
@@ -146,6 +203,42 @@  get_ka_timer_interval(const struct smap *ovs_other_config OVS_UNUSED)
     return ka_interval;
 }
 
+/* This gets called during every datapath reconfiguration.
+ * Don't allocate memory every time, allocate memory only if
+ * this is invoked first time and use realloc all other times.
+ */
+int
+ka_alloc_portstats(unsigned core_id, int nports)
+{
+    struct poll_port_stats *port_stats =
+        ka_info->ext_stats[core_id].port_stats;
+
+    if (!port_stats) {
+        port_stats = xmalloc(nports * sizeof *port_stats);
+        if (!port_stats) {
+            return ENOMEM;
+        }
+    } else {
+        port_stats = xrealloc(port_stats, nports * sizeof *port_stats);
+    }
+
+    ka_info->ext_stats[core_id].port_stats = port_stats;
+    ka_info->ext_stats[core_id].num_poll_ports = nports;
+    return 0;
+}
+
+void
+ka_destroy_portstats(void)
+{
+    struct poll_port_stats *port_stats;
+    for (int coreid = 0; coreid < KA_DP_MAXCORES; coreid++) {
+        port_stats = ka_info->ext_stats[coreid].port_stats;
+        if (port_stats) {
+            free(port_stats);
+        }
+    }
+}
+
 static struct keepalive_info *
 keepalive_info_create(void)
 {
@@ -206,17 +299,20 @@  ka_init(const struct smap *ovs_other_config)
 void
 ka_destroy(void)
 {
-    if (ka_info) {
-        struct ka_process_info *pinfo;
-        ovs_mutex_lock(&ka_info->proclist_mutex);
-        HMAP_FOR_EACH_POP (pinfo, node, &ka_info->process_list) {
-            free(pinfo);
-        }
-        ovs_mutex_unlock(&ka_info->proclist_mutex);
-        hmap_destroy(&ka_info->process_list);
-
-        ovs_mutex_destroy(&ka_info->proclist_mutex);
+    if (!ka_info) {
+        return;
+    }
 
-        free(ka_info);
+    struct ka_process_info *pinfo;
+    ovs_mutex_lock(&ka_info->proclist_mutex);
+    HMAP_FOR_EACH_POP (pinfo, node, &ka_info->process_list) {
+        free(pinfo);
     }
+    ovs_mutex_unlock(&ka_info->proclist_mutex);
+    hmap_destroy(&ka_info->process_list);
+
+    ovs_mutex_destroy(&ka_info->proclist_mutex);
+    ka_destroy_portstats();
+
+    free(ka_info);
 }
diff --git a/lib/keepalive.h b/lib/keepalive.h
index 67f89da..f1e232d 100644
--- a/lib/keepalive.h
+++ b/lib/keepalive.h
@@ -41,13 +41,25 @@  enum keepalive_state {
 };
 
 struct ka_process_info {
+    char name[16];
     int tid;
     int core_id;
+    bool heartbeats;
     enum keepalive_state core_state;
     uint64_t core_last_seen_times;
     struct hmap_node node;
 };
 
+struct poll_port_stats {
+    const char *port;
+    int qid;
+};
+
+struct pmd_extended_stats {
+    struct poll_port_stats *port_stats;
+    int num_poll_ports;
+};
+
 struct keepalive_info {
     /* Mutex for 'process_list'. */
     struct ovs_mutex proclist_mutex;
@@ -55,9 +67,16 @@  struct keepalive_info {
     /* List of process/threads monitored by KA framework. */
     struct hmap process_list OVS_GUARDED;
 
+    /* count of threads registered to KA framework. */
+    uint32_t pmd_cnt;
+    uint32_t nonpmd_cnt;
+
     /* Store Datapath threads 'tid'.
      * In case of DPDK there can be max of KA_DP_MAXCORES threads. */
     pid_t thread_id[KA_DP_MAXCORES];
+
+    /* Additional statistics to monitor health. */
+    struct pmd_extended_stats ext_stats[KA_DP_MAXCORES];
 };
 
 enum keepalive_status {
@@ -71,8 +90,8 @@  void ka_set_pmd_state_ts(unsigned, enum keepalive_state, uint64_t);
 
 int ka_get_pmd_tid(unsigned core);
 bool ka_is_enabled(void);
-void ka_register_pmd_thread(int, unsigned);
-void ka_unregister_pmd_thread(int, unsigned);
+void ka_register_thread(int, bool);
+void ka_unregister_thread(int, bool);
 void ka_mark_pmd_thread_alive(void);
 void ka_mark_pmd_thread_sleep(void);
 
@@ -80,5 +99,7 @@  void ka_store_pmd_id(unsigned core);
 uint32_t get_ka_interval(void);
 int get_ka_init_status(void);
 int ka_get_pmd_tid(unsigned core);
+int ka_alloc_portstats(unsigned, int);
+void ka_destroy_portstats(void);
 
 #endif /* keepalive.h */