diff mbox

[ovs-dev,RFC] netdev-dpdk: Add OVS DPDK keep-alive functionality

Message ID 1467900062-5291-1-git-send-email-bhanuprakash.bodireddy@intel.com
State Superseded
Delegated to: Daniele Di Proietto
Headers show

Commit Message

Bodireddy, Bhanuprakash July 7, 2016, 2:01 p.m. UTC
This patch is aimed towards achieving Fastpath Service Assurance in
OVSDPDK deployments. This commit adds support for monitoring the packet
processing cores(pmd thread cores) by dispatching heartbeats at regular
intervals. Incase of missing heartbeat the failure shall be detected &
reported which will eventually be consumed by higher level fault management
systems.

The implementation uses POSIX shared memory object. keep-alive feature
can be enabled by setting "dpdk-keepalive=true" in ovsdb. When enabled,
'ovs-keepalive' thread shall be spawned that wakes up in regular timer
intervals to update the timestamp, status of pmd cores in shared memory.
The timer interval of the 'ovs-keepalive' thread is configurable in ovsdb
using "dpdk-keepalive-interval=<interval>" and supports millisecond
graunularity.

An external monitoring application like collectd with dpdk plugin
support (dpdkevent plugin) can read the status updates from shared memory.
On a missing heartbeat, the keepalive thread shall increment the
semaphore there by unblocking the collectd thread which will relay the
status to ceilometer. collectd also can detect OVS DPDK crash when the
core last seen timestamp isn't updated for a while in shared memory.
The synchronization between OVS keepalive and collectd threads is
handled by sempahore. Below is the very high level overview of usecase.

        Compute Node                   Controller
   
         Collectd  <-----------------> Ceilometer   
         OVS DPDK     
  

   +-----+
   | VM  |
   +--+--+
  \---+---/
      |
   +--+---+       +------------+----------+     +------+-------+
   | OVS  |-----> |  collectd DPDK plugin | --> |   collectd   |
   +--+---+       +------------+----------+     +------+-------+        
                                                     
 +------+-----+     +---------------+------------+         
 | Ceilometer | <-- | collectd ceilometer plugin |  <----
 +------+-----+     +---------------+------------+

To enable keepalive functionality(Default is false in the ovsdb)
  'ovs-vsctl set Open_vSwitch . other_config:dpdk-keepalive=true'

To set keepalive timer interval(millisecond granularity)
  'ovs-vsctl set Open_vSwitch . other_config:dpdk-keepalive-interval="10"'

The DPDK ka-agent sample application can be used as a monitoring app
to detect and report the core status inplace of collectd.

collectd: Enabling dpdk plugin in collectd is WIP and under
review here, https://github.com/collectd/collectd/pull/1649/commits

Please note that to test this functionality RFC patch support DPDK 16.07 
be applied first, http://openvswitch.org/pipermail/dev/2016-July/074448.html

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
---
 lib/dpif-netdev.c    |   9 ++
 lib/netdev-dpdk.c    | 232 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.h    |  29 +++++++
 lib/process.c        |  57 +++++++++++++
 lib/process.h        |  10 +++
 lib/util.c           |  12 +++
 lib/util.h           |   1 +
 vswitchd/vswitch.xml |  28 +++++++
 8 files changed, 377 insertions(+), 1 deletion(-)
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 37c2631..e6baeb5 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -2849,6 +2849,9 @@  pmd_thread_main(void *f_)
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
+
+    /* Store the pmd thread_id in shared memory. */
+    dpdk_ka_shm_store_tid(pmd->core_id);
 reload:
     emc_cache_init(&pmd->flow_cache);
 
@@ -2864,6 +2867,9 @@  reload:
             dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
         }
 
+        /* Mark core alive. */
+        dpdk_ka_mark_core_alive();
+
         if (lc++ > 1024) {
             unsigned int seq;
 
@@ -3424,6 +3430,9 @@  dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 
             dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
 
+            /* Register core for Keepalive detection. */
+            dpdk_ka_register_core(core_id);
+
             HMAP_FOR_EACH (port, node, &dp->ports) {
                 dp_netdev_add_port_tx_to_pmd(pmd, port);
             }
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 492b60d..af5bf20 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -25,10 +25,12 @@ 
 #include <sched.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <sys/stat.h>
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <fcntl.h>
 #include <getopt.h>
 #include <numaif.h>
 
@@ -54,15 +56,22 @@ 
 #include "unaligned.h"
 #include "timeval.h"
 #include "unixctl.h"
+#include "process.h"
 
 #include "rte_config.h"
 #include "rte_mbuf.h"
 #include "rte_meter.h"
 #include "rte_virtio_net.h"
+#include "rte_keepalive.h"
 
 VLOG_DEFINE_THIS_MODULE(dpdk);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
+static struct rte_keepalive *rte_global_keepalive_info;
+uint64_t keepalive_timer_interval;
+bool ovs_keepalive_enable = false;
 
+#define OVS_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
+#define OVS_KEEPALIVE_TIMEOUT 100
 #define DPDK_PORT_WATCHDOG_INTERVAL 5
 
 #define OVS_CACHE_LINE_SIZE CACHE_LINE_SIZE
@@ -390,6 +399,30 @@  struct netdev_rxq_dpdk {
     int port_id;
 };
 
+/*
+ * OVS Shared Memory structure
+ *
+ * The information in the shared memory block will be read by collectd.
+ */
+struct ovs_keepalive_shm {
+    /* IPC semaphore. Posted when a core dies */
+    sem_t core_died;
+
+    /*
+     * Relayed status of each core.
+     * UNUSED[0], ALIVE[1], DEAD[2], GONE[3], MISSING[4], DOZING[5], SLEEP[6]
+     */
+    enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
+
+    /* Last seen timestamp of the core */
+    uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
+
+    /* Store pmd thread tid */
+    pid_t thread_id[RTE_KEEPALIVE_MAXCORES];
+};
+
+static struct ovs_keepalive_shm *ka_shm;
+
 static bool dpdk_thread_is_pmd(void);
 
 static int netdev_dpdk_construct(struct netdev *);
@@ -524,6 +557,158 @@  dpdk_mp_put(struct dpdk_mp *dmp)
 #endif
 }
 
+/* Callback function invoked on heartbeat miss. */
+static void
+failcore_cb(void *ptr_data, const int core_id)
+{
+    struct ovs_keepalive_shm *ka_shm = (struct ovs_keepalive_shm *)ptr_data;
+    int tid = ka_shm->thread_id[core_id];
+
+    if (get_process_status(tid) != ACTIVE_STATE) {
+        VLOG_INFO("Pmd thread tid %d on %d is unresponsive \n", tid, core_id);
+    } else {
+        VLOG_INFO("False positive and ignoring the alarm, pmd thread \
+                   is active and tid is %d\n", tid);
+    }
+}
+
+/* Notify the external monitoring application for change in core state.  On
+ * a consecutive heartbeat miss the core is considered dead and the status
+ * is relayed to collectd threads by unlocking the semaphore.
+ */
+static void
+ka_relay_core_state(void *ptr_data, const int core_id,
+       const enum rte_keepalive_state core_state, uint64_t last_alive)
+{
+    struct ovs_keepalive_shm *ka_shm = (struct ovs_keepalive_shm *)ptr_data;
+    int count;
+
+    ka_shm->core_state[core_id] = core_state;
+    ka_shm->core_last_seen_times[core_id] = last_alive;
+
+    if (OVS_UNLIKELY(core_state == RTE_KA_STATE_DEAD)) {
+        /* To handle inactive collectd, increment the semaphore
+         * if count is '0'. */
+        if (sem_getvalue(&ka_shm->core_died, &count) == -1) {
+            VLOG_WARN("Semaphore check failed\n");
+            return;
+        }
+
+        if (count > 1) {
+            return;
+        }
+
+        if (sem_post(&ka_shm->core_died) != 0) {
+            VLOG_INFO("Failed to increment semaphore\n");
+        }
+    }
+}
+
+/* Create POSIX Shared memory object and initialize the semaphore. */
+static
+struct ovs_keepalive_shm *ovs_keepalive_shm_create(void)
+{
+    int fd;
+    int coreid;
+    struct ovs_keepalive_shm *ka_shm;
+
+    if (shm_unlink(OVS_KEEPALIVE_SHM_NAME) == -1 && errno != ENOENT) {
+        printf("Warning: Error unlinking stale %s \n", OVS_KEEPALIVE_SHM_NAME);
+    }
+
+    if ((fd = shm_open(OVS_KEEPALIVE_SHM_NAME,
+           O_CREAT | O_TRUNC | O_RDWR, 0666)) < 0) {
+        VLOG_WARN("Failed to open %s as SHM \n", OVS_KEEPALIVE_SHM_NAME);
+    } else if (ftruncate(fd, sizeof(struct ovs_keepalive_shm)) != 0) {
+        VLOG_WARN("Failed to resize SHM \n");
+    } else {
+        ka_shm = (struct ovs_keepalive_shm *) mmap(
+           0, sizeof(struct ovs_keepalive_shm),
+            PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+        close(fd);
+        if (ka_shm == MAP_FAILED) {
+            VLOG_WARN("Failed to mmap SHM \n");
+        } else {
+            memset(ka_shm, 0, sizeof(struct ovs_keepalive_shm));
+
+            /* Initialize the semaphores for IPC/SHM use */
+            if (sem_init(&ka_shm->core_died, 1, 0) != 0) {
+                VLOG_WARN("Failed to setup SHM semaphore \n");
+                return NULL;
+            }
+
+            /* Mark all cores to 'not present' */
+            for (coreid = 0; coreid < RTE_KEEPALIVE_MAXCORES; coreid++) {
+                ka_shm->core_state[coreid] = RTE_KA_STATE_UNUSED;
+                ka_shm->core_last_seen_times[coreid] = 0;
+            }
+
+            return ka_shm;
+        }
+    }
+    return NULL;
+}
+
+/* Initialize Keepalive sub-system and register callback. */
+static int
+keepalive_init(void)
+{
+    /* Create shared memory block */
+    ka_shm = ovs_keepalive_shm_create();
+    if (ka_shm == NULL) {
+        VLOG_ERR("ovs_keepalive_shm_create() failed\n");
+        return -1;
+    }
+
+    /* Initialize keepalive subsystem */
+    if ((rte_global_keepalive_info =
+            rte_keepalive_create(&failcore_cb, ka_shm)) == NULL) {
+        VLOG_ERR("Keepalive initialization failed\n");
+        return -1;
+    } else {
+        rte_keepalive_register_relay_callback(rte_global_keepalive_info,
+            ka_relay_core_state, ka_shm);
+    }
+
+    return 0;
+}
+
+/* Retrieve and return the Keepalive interval from OVSDB */
+static uint64_t
+get_ka_timer_interval(const struct smap *ovs_other_config)
+{
+    const char *keepalive_timer;
+
+    /* Timer granularity in milliseconds
+     * Defaults to OVS_KEEPALIVE_TIMEOUT (100 ms) if not set in ovsdb */
+    keepalive_timer = smap_get(ovs_other_config, "dpdk-keepalive-interval");
+    keepalive_timer_interval = keepalive_timer ?
+        strtoull(keepalive_timer, NULL, 10) : OVS_KEEPALIVE_TIMEOUT;
+
+    /* if timer_interval set to '0', reset it to OVS_KEEPALIVE_TIMEOUT ms */
+    if (!keepalive_timer_interval) {
+        keepalive_timer_interval = OVS_KEEPALIVE_TIMEOUT;
+    }
+
+    VLOG_INFO("The keepalive_timer is %lu(ms)\n", keepalive_timer_interval);
+
+    return keepalive_timer_interval;
+}
+
+/* Keepalive thread. */
+static void *
+ovs_keepalive(void *dummy OVS_UNUSED)
+{
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        rte_keepalive_dispatch_pings(NULL, rte_global_keepalive_info);
+        xusleep(keepalive_timer_interval * 1000);
+    }
+
+    return NULL;
+}
+
 static void
 check_link_status(struct netdev_dpdk *dev)
 {
@@ -3234,6 +3419,11 @@  dpdk_init__(const struct smap *ovs_other_config)
 
     VLOG_INFO("DPDK Enabled, initializing");
 
+    if (smap_get_bool(ovs_other_config, "dpdk-keepalive", false)) {
+        ovs_keepalive_enable = true;
+        VLOG_INFO("OVSDPDK keepalive enabled \n");
+    }
+
 #ifdef VHOST_CUSE
     if (process_vhost_flags("cuse-dev-name", xstrdup("vhost-net"),
                             PATH_MAX, ovs_other_config, &cuse_dev_name)) {
@@ -3352,6 +3542,14 @@  dpdk_init__(const struct smap *ovs_other_config)
 
     ovs_thread_create("dpdk_watchdog", dpdk_watchdog, NULL);
 
+    /* OVSDPDK keepalive initialization. */
+    if (dpdk_is_ka_enabled()) {
+        if (keepalive_init() != -1) {
+            keepalive_timer_interval = get_ka_timer_interval(ovs_other_config);
+            ovs_thread_create("ovs_keepalive", ovs_keepalive, NULL);
+        }
+    }
+
 #ifdef VHOST_CUSE
     /* Register CUSE device to handle IOCTLs.
      * Unless otherwise specified, cuse_dev_name is set to vhost-net.
@@ -3463,3 +3661,35 @@  dpdk_thread_is_pmd(void)
 {
     return rte_lcore_id() != NON_PMD_CORE_ID;
 }
+
+inline bool
+dpdk_is_ka_enabled()
+{
+    return ovs_keepalive_enable;
+}
+
+void
+dpdk_ka_register_core(unsigned core_id)
+{
+    if (dpdk_is_ka_enabled()) {
+        rte_keepalive_register_core(rte_global_keepalive_info, core_id);
+    }
+}
+
+void
+dpdk_ka_mark_core_alive(void)
+{
+    if (dpdk_is_ka_enabled()) {
+        rte_keepalive_mark_alive(rte_global_keepalive_info);
+    }
+}
+
+void
+dpdk_ka_shm_store_tid(unsigned core_id)
+{
+    if (dpdk_is_ka_enabled()) {
+#ifndef _WIN32
+        ka_shm->thread_id[core_id] = syscall(SYS_gettid);
+#endif
+    }
+}
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index 80bb834..91c6453 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -2,6 +2,7 @@ 
 #define NETDEV_DPDK_H
 
 #include <config.h>
+#include <semaphore.h>
 
 struct dp_packet;
 struct smap;
@@ -26,6 +27,10 @@  struct smap;
 void netdev_dpdk_register(void);
 void free_dpdk_buf(struct dp_packet *);
 void dpdk_set_lcore_id(unsigned cpu);
+void dpdk_ka_shm_store_tid(unsigned core_id);
+void dpdk_ka_mark_core_alive(void);
+void dpdk_ka_register_core(unsigned core_id);
+bool dpdk_is_ka_enabled(void);
 
 #else
 
@@ -51,6 +56,30 @@  dpdk_set_lcore_id(unsigned cpu OVS_UNUSED)
     /* Nothing */
 }
 
+static inline void
+dpdk_ka_register_core(unsigned core_id OVS_UNUSED)
+{
+    /* Nothing */
+}
+
+static inline void
+dpdk_ka_mark_core_alive(void)
+{
+    /* Nothing */
+}
+
+static inline void
+dpdk_ka_shm_store_tid(unsigned core_id OVS_UNUSED)
+{
+    /* Nothing */
+}
+
+static inline bool
+dpdk_is_ka_enabled(void)
+{
+    return false;
+}
+
 #endif /* DPDK_NETDEV */
 
 void dpdk_init(const struct smap *ovs_other_config);
diff --git a/lib/process.c b/lib/process.c
index e9d0ba9..85d50b9 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -50,6 +50,20 @@  struct process {
     int status;
 };
 
+struct pstate2Num {
+    char *tidState;
+    int num;
+};
+
+const struct pstate2Num pstate_map[] = {
+    { "S", STOPPED_STATE },
+    { "R", ACTIVE_STATE },
+    { "t", TRACED_STATE },
+    { "Z", DEFUNC_STATE },
+    { "D", UNINTERRUPTIBLE_SLEEP_STATE },
+    { "NULL", UNUSED_STATE },
+};
+
 /* Pipe used to signal child termination. */
 static int fds[2];
 
@@ -390,6 +404,49 @@  process_run(void)
 #endif
 }
 
+int
+get_process_status(int tid)
+{
+#ifndef _WIN32
+    static char process_name[20];
+    FILE *stream;
+    char line[256];
+    char Name[10], value[5], status[10];
+    int i, ln;
+
+    snprintf(process_name, sizeof(process_name),
+             "/proc/%d/status", tid);
+    stream = fopen(process_name, "r");
+    if (stream == NULL) {
+        VLOG_WARN_ONCE("%s: open failed: %s", process_name,
+            ovs_strerror(errno));
+        return errno;
+    }
+
+    ln=0;
+    while (fgets(line, sizeof line, stream)) {
+        if (!ovs_scan(line,
+                      "%16s %4s %14s\n",
+                       Name, value, status)) {
+            VLOG_WARN_ONCE("%s: could not parse line %d: %s",
+                    process_name, ln, line);
+            continue;
+        }
+        if (!strcmp(Name, "State:")) {
+            for (i=0; pstate_map[i].tidState != NULL; i++) {
+                if (strcmp(pstate_map[i].tidState, value) == 0) {
+                    VLOG_DBG("The state is %s, status is %d\n",
+                        pstate_map[i].tidState, pstate_map[i].num);
+                    return pstate_map[i].num;
+                }
+            }
+            break;
+        }
+        ln++;
+   }
+   return 0;
+#endif
+}
 
 /* Causes the next call to poll_block() to wake up when process 'p' has
  * exited. */
diff --git a/lib/process.h b/lib/process.h
index 3feac7e..283991f 100644
--- a/lib/process.h
+++ b/lib/process.h
@@ -20,6 +20,15 @@ 
 #include <stdbool.h>
 #include <sys/types.h>
 
+enum process_states {
+    UNUSED_STATE,
+    STOPPED_STATE,
+    ACTIVE_STATE,
+    TRACED_STATE,
+    DEFUNC_STATE,
+    UNINTERRUPTIBLE_SLEEP_STATE
+};
+
 struct process;
 
 /* Starting and monitoring subprocesses.
@@ -38,6 +47,7 @@  bool process_exited(struct process *);
 int process_status(const struct process *);
 void process_run(void);
 void process_wait(struct process *);
+int get_process_status(int);
 
 /* These functions are thread-safe. */
 char *process_status_msg(int);
diff --git a/lib/util.c b/lib/util.c
index e1dc3d2..095b1b1 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -2086,6 +2086,18 @@  xsleep(unsigned int seconds)
     ovsrcu_quiesce_end();
 }
 
+void
+xusleep(unsigned int microseconds)
+{
+    ovsrcu_quiesce_start();
+#ifdef _WIN32
+    Sleep(microseconds/1000);
+#else
+    usleep(microseconds);
+#endif
+    ovsrcu_quiesce_end();
+}
+
 /* Determine whether standard output is a tty or not. This is useful to decide
  * whether to use color output or not when --color option for utilities is set
  * to `auto`.
diff --git a/lib/util.h b/lib/util.h
index e738c9f..fd360d2 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -438,6 +438,7 @@  ovs_u128_and(const ovs_u128 a, const ovs_u128 b)
 }
 
 void xsleep(unsigned int seconds);
+void xusleep(unsigned int microseconds);
 
 bool is_stdout_a_tty(void);
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 072fef4..54ffe9e 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -275,6 +275,34 @@ 
         </p>
       </column>
 
+      <column name="other_config" key="dpdk-keepalive"
+              type='{"type": "boolean"}'>
+        <p>
+          Set this value to <code>true</code> to enable DPDK keepalive
+          feature. The vswitch must have compile-time support for DPDK as
+          well.
+        </p>
+        <p>
+          The default value is <code>false</code>. Changing this value requires
+          restarting the daemon
+        </p>
+        <p>
+          If this value is <code>false</code> at startup, keepalive thread
+          shall not be spawned.
+        </p>
+      </column>
+
+      <column name="other_config" key="dpdk-keepalive-interval"
+              type='{"type": "string"}'>
+        <p>
+          Specifies the DPDK keepalive interval value.
+        </p>
+        <p>
+          If not specified, this will be guessed by the DPDK library (default
+          is 100ms). Changing this value requires restarting the daemon.
+        </p>
+      </column>
+
       <column name="other_config" key="dpdk-extra"
               type='{"type": "string"}'>
         <p>