[ovs-dev,per-port,ingress,scheduling,2/2] ingress scheduling: Provide per interface ingress priority

Message ID 1535464703-12708-3-git-send-email-billy.o.mahony@intel.com
State New
Delegated to: Ian Stokes
Headers show
Series
  • [ovs-dev,per-port,ingress,scheduling,1/2] ingress scheduling: documentation
Related show

Commit Message

Billy O'Mahony Aug. 28, 2018, 1:58 p.m.
Allow configuration to specify an ingress priority for interfaces.
Modify dpif-netdev datapath to act on this configuration so that packets
on interfaces with a higher priority will tend be processed ahead of
packets on lower priority interfaces.  This protects traffic on higher
priority interfaces from packet loss as PMDs get overloaded.

Signed-off-by: Billy O'Mahony <billy.o.mahony@intel.com>
---
 include/openvswitch/ofp-parse.h |   3 +
 lib/dpif-netdev.c               | 188 +++++++++++++++++++++++++++++++++-------
 lib/netdev-dpdk.c               |  10 +++
 3 files changed, 170 insertions(+), 31 deletions(-)

Patch

diff --git a/include/openvswitch/ofp-parse.h b/include/openvswitch/ofp-parse.h
index 3fdd468..d77ab8f 100644
--- a/include/openvswitch/ofp-parse.h
+++ b/include/openvswitch/ofp-parse.h
@@ -33,6 +33,9 @@  extern "C" {
 struct match;
 struct mf_field;
 struct ofputil_port_map;
+struct tun_table;
+struct flow_wildcards;
+struct ofputil_port_map;
 
 struct ofp_protocol {
     const char *name;
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 807a462..3ed8e09 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -22,6 +22,7 @@ 
 #include <fcntl.h>
 #include <inttypes.h>
 #include <net/if.h>
+#include <math.h>
 #include <sys/types.h>
 #include <netinet/in.h>
 #include <stdint.h>
@@ -42,6 +43,7 @@ 
 #include "dpif.h"
 #include "dpif-netdev-perf.h"
 #include "dpif-provider.h"
+#include "netdev-provider.h"
 #include "dummy.h"
 #include "fat-rwlock.h"
 #include "flow.h"
@@ -49,7 +51,6 @@ 
 #include "id-pool.h"
 #include "latch.h"
 #include "netdev.h"
-#include "netdev-provider.h"
 #include "netdev-vport.h"
 #include "netlink.h"
 #include "odp-execute.h"
@@ -460,6 +461,7 @@  struct dp_netdev_port {
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
     char *rxq_affinity_list;    /* Requested affinity of rx queues. */
+    int ingress_prio;           /* 0 lowest to 3 highest. Default 0. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -572,6 +574,7 @@  static void dp_netdev_actions_free(struct dp_netdev_actions *);
 struct polled_queue {
     struct dp_netdev_rxq *rxq;
     odp_port_t port_no;
+    uint8_t max_reads;
 };
 
 /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
@@ -711,6 +714,10 @@  struct dpif_netdev {
     uint64_t last_port_seq;
 };
 
+static int
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
+                           struct dp_netdev_rxq *rxq,
+                           odp_port_t port_no);
 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
                               struct dp_netdev_port **portp)
     OVS_REQUIRES(dp->port_mutex);
@@ -3847,6 +3854,36 @@  exit:
     return error;
 }
 
+static void
+set_need_reload_on_all_pmds_for_port(struct dp_netdev *dp, odp_port_t port_no)
+{
+    /* Check each pmd to see if it is reading a queue belonging to
+       port_no and if so set need_reload of that pmd */
+    struct dp_netdev_pmd_thread *pmd;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+    struct rxq_poll *poll;
+        HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+            if (poll->rxq->port->port_no == port_no) {
+                pmd->need_reload = true;
+            }
+        }
+    }
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            flow_mark_flush(pmd);
+            dp_netdev_reload_pmd__(pmd);
+            pmd->need_reload = false;
+        }
+    }
+}
+
 /* Changes the affinity of port's rx queues.  The changes are actually applied
  * in dpif_netdev_run(). */
 static int
@@ -3859,20 +3896,41 @@  dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
     const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");
 
     ovs_mutex_lock(&dp->port_mutex);
+
     error = get_port_by_number(dp, port_no, &port);
-    if (error || !netdev_is_pmd(port->netdev)
-        || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+    if (error || !netdev_is_pmd(port->netdev)) {
         goto unlock;
     }
 
-    error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
-    if (error) {
-        goto unlock;
+    if (!nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+        error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
+        if (!error) {
+            free(port->rxq_affinity_list);
+            port->rxq_affinity_list = nullable_xstrdup(affinity_list);
+            dp_netdev_request_reconfigure(dp);
+        }
+    }
+
+    const char *port_prio_str = smap_get(cfg, "port_prio");
+    uint8_t port_prio;
+    char *mallocd_err_str; /* str_to_x mallocs a str we'll need to free */
+    if (port_prio_str) {
+        mallocd_err_str = str_to_u8(port_prio_str, "port_prio",
+                                &port_prio);
+        if (!mallocd_err_str) {
+            if (port->ingress_prio != port_prio) {
+                port->ingress_prio = port_prio;
+                set_need_reload_on_all_pmds_for_port(dp, port_no);
+                reload_affected_pmds(dp);
+            }
+        } else {
+            VLOG_ERR ("%s while parsing ingress_sched:port_prio for %s",
+                      mallocd_err_str, port->netdev->name);
+            free(mallocd_err_str);
+            mallocd_err_str = NULL;
+        }
     }
-    free(port->rxq_affinity_list);
-    port->rxq_affinity_list = nullable_xstrdup(affinity_list);
 
-    dp_netdev_request_reconfigure(dp);
 unlock:
     ovs_mutex_unlock(&dp->port_mutex);
     return error;
@@ -4434,20 +4492,6 @@  rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
 }
 
 static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->need_reload) {
-            flow_mark_flush(pmd);
-            dp_netdev_reload_pmd__(pmd);
-            pmd->need_reload = false;
-        }
-    }
-}
-
-static void
 reconfigure_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
@@ -4741,11 +4785,13 @@  dpif_netdev_run(struct dpif *dpif)
         HMAP_FOR_EACH (port, node, &dp->ports) {
             if (!netdev_is_pmd(port->netdev)) {
                 int i;
+                unsigned int rxd_cnt;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    if (dp_netdev_process_rxq_port(non_pmd,
-                                                   &port->rxqs[i],
-                                                   port->port_no)) {
+                    rxd_cnt = dp_netdev_process_rxq_port(non_pmd,
+                                               &port->rxqs[i],
+                                               port->port_no);
+                    if (rxd_cnt) {
                         need_to_flush = false;
                     }
                 }
@@ -4873,6 +4919,29 @@  pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
     ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
 }
 
+/* Return the number of rxq descriptors for a netdev's rxqs or -1 if not
+ * available e.g. a vhostuser device that has not yet been configured  by it's
+ * driver. */
+static int
+get_nb_rxqdesc (struct netdev *netdev) {
+    struct smap smap = SMAP_INITIALIZER(&smap);
+    netdev_get_config(netdev, &smap);
+    const char *n_rxq_s = smap_get(&smap, "configured_rxq_descriptors");
+    long n_rxq;
+    if (n_rxq_s) {
+        str_to_long(n_rxq_s, 10, &n_rxq);
+    } else {
+        n_rxq = -1;
+    }
+    smap_destroy(&smap);
+    return (int) n_rxq;
+}
+
+#define MAX_PRIO_READS (48)
+#define MIN_PRIO_READS (1)
+#define RAW_TO_NORM_FN_EXP (-0.0280)
+#define PRIO_TO_MAX_READS_SCALAR (10)
+
 static int
 pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
                           struct polled_queue **ppoll_list)
@@ -4885,13 +4954,66 @@  pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
     poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
                                     * sizeof *poll_list);
 
+    /* Find max rxq len - used to weight raw priority to account for differing
+     * queue lengths. Has no effect on q's for non-prioritized netdevs. */
+    int max_nb_rxqdesc = 0;
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+        if (nb_rxqdesc > max_nb_rxqdesc) {
+            max_nb_rxqdesc = nb_rxqdesc;
+        }
+    }
+
+    /* Populate ppoll_list; Assign 'raw' queue q priorities. */
     i = 0;
+    uint16_t min_raw_prio = UINT16_MAX;
+    uint16_t max_raw_prio = 0;
     HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
         poll_list[i].rxq = poll->rxq;
         poll_list[i].port_no = poll->rxq->port->port_no;
+
+        int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+        int raw_prio;
+        if (nb_rxqdesc > 0) {
+            raw_prio = poll->rxq->port->ingress_prio
+                             * PRIO_TO_MAX_READS_SCALAR * max_nb_rxqdesc
+                             / nb_rxqdesc;
+        } else {
+            /* Treat queues with unknown rxq len (such as unattached vports)
+               as being as long as the longest rxq */
+            raw_prio = 0;
+        }
+
+        poll_list[i].max_reads = raw_prio;
+        if (raw_prio > max_raw_prio) {
+            max_raw_prio = raw_prio;
+        }
+        if (raw_prio < min_raw_prio) {
+            min_raw_prio = raw_prio;
+        }
         i++;
     }
 
+    /* Adjust 'raw' queue priorities so that:
+     * 1. MAX_PRIO_READS is not exeeded.
+     * 2. The lowest max_reads value for the PMD is 1.
+     * 3. The ratio between raw max_reads values is more or less maintained
+     *    for lower values but higher values reduced to meet criterion 1.
+     * Using a exponential fn(x): x = a + b^(-ex) with well-chosen parameters
+     * meets these requirements. */
+    int end_idx = i;
+    for (i = 0; i < end_idx; i++) {
+        int current = poll_list[i].max_reads;
+        current -= min_raw_prio;
+        poll_list[i].max_reads = (int) MAX_PRIO_READS -
+                                (MAX_PRIO_READS - MIN_PRIO_READS) *
+                                exp(RAW_TO_NORM_FN_EXP * current);
+        VLOG_DBG("Port '%s', q %d : max reads %d",
+                     poll_list[i].rxq->rx->netdev->name,
+                     poll_list[i].rxq->rx->queue_id,
+                     poll_list[i].max_reads);
+    }
+
     pmd_load_cached_ports(pmd);
 
     ovs_mutex_unlock(&pmd->port_mutex);
@@ -4910,7 +5032,6 @@  pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -4951,10 +5072,15 @@  reload:
         pmd_perf_start_iteration(s);
 
         for (i = 0; i < poll_cnt; i++) {
-            process_packets =
-                dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
-                                           poll_list[i].port_no);
-            rx_packets += process_packets;
+            unsigned int max_reads = poll_list[i].max_reads;
+            unsigned int rxd_pkt_cnt = 0;
+            do {
+                rxd_pkt_cnt =
+                    dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
+                                               poll_list[i].port_no);
+                rx_packets += rxd_pkt_cnt;
+                max_reads--;
+            } while (rxd_pkt_cnt >= NETDEV_MAX_BURST && max_reads);
         }
 
         if (!rx_packets) {
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index f91aa27..8defa6f 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -1044,6 +1044,7 @@  dpdk_eth_dev_init(struct netdev_dpdk *dev)
         return -diag;
     }
 
+
     diag = rte_eth_dev_start(dev->port_id);
     if (diag) {
         VLOG_ERR("Interface %s start error: %s", dev->up.name,
@@ -1554,7 +1555,16 @@  netdev_dpdk_get_config(const struct netdev *netdev, struct smap *args)
         }
         smap_add(args, "lsc_interrupt_mode",
                  dev->lsc_interrupt_mode ? "true" : "false");
+    } else if (dev->type == DPDK_DEV_VHOST) {
+        int vid = netdev_dpdk_get_vid(dev);
+        if (vid >= 0) {
+            struct rte_vhost_vring vring;
+            rte_vhost_get_vhost_vring(vid, VIRTIO_RXQ, &vring);
+            smap_add_format(args, "configured_rxq_descriptors", "%d",
+                            vring.size);
+        }
     }
+
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;