@@ -33,6 +33,9 @@ extern "C" {
struct match;
struct mf_field;
struct ofputil_port_map;
+struct tun_table;
+struct flow_wildcards;
+struct ofputil_port_map;
struct ofp_protocol {
const char *name;
@@ -22,6 +22,7 @@
#include <fcntl.h>
#include <inttypes.h>
#include <net/if.h>
+#include <math.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <stdint.h>
@@ -42,6 +43,7 @@
#include "dpif.h"
#include "dpif-netdev-perf.h"
#include "dpif-provider.h"
+#include "netdev-provider.h"
#include "dummy.h"
#include "fat-rwlock.h"
#include "flow.h"
@@ -49,7 +51,6 @@
#include "id-pool.h"
#include "latch.h"
#include "netdev.h"
-#include "netdev-provider.h"
#include "netdev-vport.h"
#include "netlink.h"
#include "odp-execute.h"
@@ -460,6 +461,7 @@ struct dp_netdev_port {
struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
char *rxq_affinity_list; /* Requested affinity of rx queues. */
+ int ingress_prio; /* 0 lowest to 3 highest. Default 0. */
};
/* Contained by struct dp_netdev_flow's 'stats' member. */
@@ -572,6 +574,7 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
struct polled_queue {
struct dp_netdev_rxq *rxq;
odp_port_t port_no;
+ uint8_t max_reads;
};
/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
@@ -711,6 +714,10 @@ struct dpif_netdev {
uint64_t last_port_seq;
};
+static int
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq,
+ odp_port_t port_no);
static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
struct dp_netdev_port **portp)
OVS_REQUIRES(dp->port_mutex);
@@ -3847,6 +3854,36 @@ exit:
return error;
}
+static void
+set_need_reload_on_all_pmds_for_port(struct dp_netdev *dp, odp_port_t port_no)
+{
+ /* Check each pmd to see if it is reading a queue belonging to
+ port_no and if so set need_reload of that pmd */
+ struct dp_netdev_pmd_thread *pmd;
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ struct rxq_poll *poll;
+ HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+ if (poll->rxq->port->port_no == port_no) {
+ pmd->need_reload = true;
+ }
+ }
+ }
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->need_reload) {
+ flow_mark_flush(pmd);
+ dp_netdev_reload_pmd__(pmd);
+ pmd->need_reload = false;
+ }
+ }
+}
+
/* Changes the affinity of port's rx queues. The changes are actually applied
* in dpif_netdev_run(). */
static int
@@ -3859,20 +3896,41 @@ dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");
ovs_mutex_lock(&dp->port_mutex);
+
error = get_port_by_number(dp, port_no, &port);
- if (error || !netdev_is_pmd(port->netdev)
- || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+ if (error || !netdev_is_pmd(port->netdev)) {
goto unlock;
}
- error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
- if (error) {
- goto unlock;
+ if (!nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+ error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
+ if (!error) {
+ free(port->rxq_affinity_list);
+ port->rxq_affinity_list = nullable_xstrdup(affinity_list);
+ dp_netdev_request_reconfigure(dp);
+ }
+ }
+
+ const char *port_prio_str = smap_get(cfg, "port_prio");
+ uint8_t port_prio;
+ char *mallocd_err_str; /* str_to_x mallocs a str we'll need to free */
+ if (port_prio_str) {
+ mallocd_err_str = str_to_u8(port_prio_str, "port_prio",
+ &port_prio);
+ if (!mallocd_err_str) {
+ if (port->ingress_prio != port_prio) {
+ port->ingress_prio = port_prio;
+ set_need_reload_on_all_pmds_for_port(dp, port_no);
+ reload_affected_pmds(dp);
+ }
+ } else {
+ VLOG_ERR ("%s while parsing ingress_sched:port_prio for %s",
+ mallocd_err_str, port->netdev->name);
+ free(mallocd_err_str);
+ mallocd_err_str = NULL;
+ }
}
- free(port->rxq_affinity_list);
- port->rxq_affinity_list = nullable_xstrdup(affinity_list);
- dp_netdev_request_reconfigure(dp);
unlock:
ovs_mutex_unlock(&dp->port_mutex);
return error;
@@ -4434,20 +4492,6 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
}
static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
- struct dp_netdev_pmd_thread *pmd;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->need_reload) {
- flow_mark_flush(pmd);
- dp_netdev_reload_pmd__(pmd);
- pmd->need_reload = false;
- }
- }
-}
-
-static void
reconfigure_pmd_threads(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex)
{
@@ -4741,11 +4785,13 @@ dpif_netdev_run(struct dpif *dpif)
HMAP_FOR_EACH (port, node, &dp->ports) {
if (!netdev_is_pmd(port->netdev)) {
int i;
+ unsigned int rxd_cnt;
for (i = 0; i < port->n_rxq; i++) {
- if (dp_netdev_process_rxq_port(non_pmd,
- &port->rxqs[i],
- port->port_no)) {
+ rxd_cnt = dp_netdev_process_rxq_port(non_pmd,
+ &port->rxqs[i],
+ port->port_no);
+ if (rxd_cnt) {
need_to_flush = false;
}
}
@@ -4873,6 +4919,29 @@ pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
}
+/* Return the number of rxq descriptors for a netdev's rxqs or -1 if not
+ * available e.g. a vhostuser device that has not yet been configured by it's
+ * driver. */
+static int
+get_nb_rxqdesc (struct netdev *netdev) {
+ struct smap smap = SMAP_INITIALIZER(&smap);
+ netdev_get_config(netdev, &smap);
+ const char *n_rxq_s = smap_get(&smap, "configured_rxq_descriptors");
+ long n_rxq;
+ if (n_rxq_s) {
+ str_to_long(n_rxq_s, 10, &n_rxq);
+ } else {
+ n_rxq = -1;
+ }
+ smap_destroy(&smap);
+ return (int) n_rxq;
+}
+
+#define MAX_PRIO_READS (48)
+#define MIN_PRIO_READS (1)
+#define RAW_TO_NORM_FN_EXP (-0.0280)
+#define PRIO_TO_MAX_READS_SCALAR (10)
+
static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
struct polled_queue **ppoll_list)
@@ -4885,13 +4954,66 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
* sizeof *poll_list);
+ /* Find max rxq len - used to weight raw priority to account for differing
+ * queue lengths. Has no effect on q's for non-prioritized netdevs. */
+ int max_nb_rxqdesc = 0;
+ HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+ int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+ if (nb_rxqdesc > max_nb_rxqdesc) {
+ max_nb_rxqdesc = nb_rxqdesc;
+ }
+ }
+
+ /* Populate ppoll_list; Assign 'raw' queue q priorities. */
i = 0;
+ uint16_t min_raw_prio = UINT16_MAX;
+ uint16_t max_raw_prio = 0;
HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
poll_list[i].rxq = poll->rxq;
poll_list[i].port_no = poll->rxq->port->port_no;
+
+ int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+ int raw_prio;
+ if (nb_rxqdesc > 0) {
+ raw_prio = poll->rxq->port->ingress_prio
+ * PRIO_TO_MAX_READS_SCALAR * max_nb_rxqdesc
+ / nb_rxqdesc;
+ } else {
+ /* Treat queues with unknown rxq len (such as unattached vports)
+ as being as long as the longest rxq */
+ raw_prio = 0;
+ }
+
+ poll_list[i].max_reads = raw_prio;
+ if (raw_prio > max_raw_prio) {
+ max_raw_prio = raw_prio;
+ }
+ if (raw_prio < min_raw_prio) {
+ min_raw_prio = raw_prio;
+ }
i++;
}
+ /* Adjust 'raw' queue priorities so that:
+ * 1. MAX_PRIO_READS is not exeeded.
+ * 2. The lowest max_reads value for the PMD is 1.
+ * 3. The ratio between raw max_reads values is more or less maintained
+ * for lower values but higher values reduced to meet criterion 1.
+ * Using a exponential fn(x): x = a + b^(-ex) with well-chosen parameters
+ * meets these requirements. */
+ int end_idx = i;
+ for (i = 0; i < end_idx; i++) {
+ int current = poll_list[i].max_reads;
+ current -= min_raw_prio;
+ poll_list[i].max_reads = (int) MAX_PRIO_READS -
+ (MAX_PRIO_READS - MIN_PRIO_READS) *
+ exp(RAW_TO_NORM_FN_EXP * current);
+ VLOG_DBG("Port '%s', q %d : max reads %d",
+ poll_list[i].rxq->rx->netdev->name,
+ poll_list[i].rxq->rx->queue_id,
+ poll_list[i].max_reads);
+ }
+
pmd_load_cached_ports(pmd);
ovs_mutex_unlock(&pmd->port_mutex);
@@ -4910,7 +5032,6 @@ pmd_thread_main(void *f_)
bool exiting;
int poll_cnt;
int i;
- int process_packets = 0;
poll_list = NULL;
@@ -4951,10 +5072,15 @@ reload:
pmd_perf_start_iteration(s);
for (i = 0; i < poll_cnt; i++) {
- process_packets =
- dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
- poll_list[i].port_no);
- rx_packets += process_packets;
+ unsigned int max_reads = poll_list[i].max_reads;
+ unsigned int rxd_pkt_cnt = 0;
+ do {
+ rxd_pkt_cnt =
+ dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
+ poll_list[i].port_no);
+ rx_packets += rxd_pkt_cnt;
+ max_reads--;
+ } while (rxd_pkt_cnt >= NETDEV_MAX_BURST && max_reads);
}
if (!rx_packets) {
@@ -1044,6 +1044,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev)
return -diag;
}
+
diag = rte_eth_dev_start(dev->port_id);
if (diag) {
VLOG_ERR("Interface %s start error: %s", dev->up.name,
@@ -1554,7 +1555,16 @@ netdev_dpdk_get_config(const struct netdev *netdev, struct smap *args)
}
smap_add(args, "lsc_interrupt_mode",
dev->lsc_interrupt_mode ? "true" : "false");
+ } else if (dev->type == DPDK_DEV_VHOST) {
+ int vid = netdev_dpdk_get_vid(dev);
+ if (vid >= 0) {
+ struct rte_vhost_vring vring;
+ rte_vhost_get_vhost_vring(vid, VIRTIO_RXQ, &vring);
+ smap_add_format(args, "configured_rxq_descriptors", "%d",
+ vring.size);
+ }
}
+
ovs_mutex_unlock(&dev->mutex);
return 0;
Allow configuration to specify an ingress priority for interfaces. Modify dpif-netdev datapath to act on this configuration so that packets on interfaces with a higher priority will tend be processed ahead of packets on lower priority interfaces. This protects traffic on higher priority interfaces from packet loss as PMDs get overloaded. Signed-off-by: Billy O'Mahony <billy.o.mahony@intel.com> --- include/openvswitch/ofp-parse.h | 3 + lib/dpif-netdev.c | 188 +++++++++++++++++++++++++++++++++------- lib/netdev-dpdk.c | 10 +++ 3 files changed, 170 insertions(+), 31 deletions(-)