@@ -467,3 +467,44 @@ Command to set interrupt mode for a specific interface::
Command to set polling mode for a specific interface::
$ ovs-vsctl set interface <iface_name> options:dpdk-lsc-interrupt=false
+
+RXQ-Steering
+------------
+
+Typically one RXQ is not sufficient to handle workloads from a high-speed NIC,
+which leads to multiple RXQs.With more RXQ comes the challenge of Placing the
+flows on the right queue.
+
+Though RSS can provide some relief by placing different flows at different
+Queues,It does not provide the flexibility of placing the flow on the optimal
+Queue.
+
+Placing the flow on the right queue is important based on the observations in
+the below scenarios.
+
+1.VM memory is one NUMA and flows land on a RXQ which is handled by a different
+PMD,leading to memory access over NUMA.
+2.5tuple-based RSS placing two different flows towards the Same VM on two
+different PMD threads, leading to the two PMD threads contending on the same
+virtq spin lock.
+
+To gain finer control on the placement of guest flows, RXQ steering is
+introduced to place the right flow to the most optimal RXQ using flow
+director/rte_flow API's to mitigate some of the above points mentioned.
+
+To add an RXQ steering rule, Fill in the MAC address of the VM and Queue to
+which you want to re-direct the packet.Every time the below string value
+changes,all the existing rules are flushed and new rules are inserted::
+
+ $ovs-vsctl set interface <iface_name> \
+ options:rxq-steer-params="<qno_1>|<VM_MAC_1>,<qno_2>|<VM_MAC_2>"
+
+To remove RXQ steering, set the value to be zero.All existing rules will be
+flushed::
+
+ $ovs-vsctl set interface <iface_name> \
+ options:rxq-steer-params="0"
+
+To isolate the RXQ only to the configured flows, use rxq_isolate like below::
+
+ $ovs-vsctl set interface eno1 options:rxq-isolate=true
@@ -156,6 +156,7 @@ typedef uint16_t dpdk_port_t;
| RTE_ETH_TX_OFFLOAD_UDP_CKSUM \
| RTE_ETH_TX_OFFLOAD_IPV4_CKSUM)
+#define RXQ_STEERING_FLOWS_MAX 10
static const struct rte_eth_conf port_conf = {
.rxmode = {
@@ -529,6 +530,14 @@ struct netdev_dpdk {
/* VF configuration. */
struct eth_addr requested_hwaddr;
+
+ /* RXQ flow steering */
+ char *requested_rxq_steer_params;
+ char *rxq_steer_params;
+ uint8_t flow_count;
+ bool rxq_isolate;
+ bool requested_rxq_isolate;
+ struct rte_flow *steering_flows[RXQ_STEERING_FLOWS_MAX];
);
PADDED_MEMBERS(CACHE_LINE_SIZE,
@@ -558,6 +567,16 @@ int netdev_dpdk_get_vid(const struct netdev_dpdk *dev);
struct ingress_policer *
netdev_dpdk_get_ingress_policer(const struct netdev_dpdk *dev);
+
+static struct rte_flow *
+generate_eth_flow(uint16_t port_id, short rx_q, struct eth_addr *dst_addr);
+static int
+flush_steering_flows(struct netdev_dpdk *dev);
+static int
+parse_flow_steer_params(char *flow_steer_params, struct netdev_dpdk *dev);
+static void
+configure_rxq_isolate(struct netdev_dpdk *dev, bool rxq_isolate);
+
static bool
is_dpdk_class(const struct netdev_class *class)
{
@@ -1911,6 +1930,28 @@ dpdk_set_rxq_config(struct netdev_dpdk *dev, const struct smap *args)
}
static void
+dpdk_process_rxq_steer_params(struct netdev_dpdk *dev, const struct smap *args)
+ OVS_REQUIRES(dev->mutex)
+{
+ const char *rxq_steer_params = smap_get(args, "rxq-steer-params");
+ bool rxq_isolate = smap_get_bool(args, "rxq-isolate", false);
+
+ if (!nullable_string_is_equal(dev->requested_rxq_steer_params,
+ rxq_steer_params)) {
+ free(dev->requested_rxq_steer_params);
+ dev->requested_rxq_steer_params = nullable_xstrdup(rxq_steer_params);
+ dev->rxq_steer_params = NULL;
+ netdev_request_reconfigure(&dev->up);
+ }
+
+ if (dev->requested_rxq_isolate != rxq_isolate) {
+ dev->requested_rxq_isolate = rxq_isolate;
+ netdev_request_reconfigure(&dev->up);
+ }
+}
+
+
+static void
dpdk_process_queue_size(struct netdev *netdev, const struct smap *args,
const char *flag, int default_size, int *new_size)
{
@@ -1948,6 +1989,8 @@ netdev_dpdk_set_config(struct netdev *netdev, const struct smap *args,
dpdk_set_rxq_config(dev, args);
+ dpdk_process_rxq_steer_params(dev, args);
+
dpdk_process_queue_size(netdev, args, "n_rxq_desc",
NIC_PORT_DEFAULT_RXQ_SIZE,
&dev->requested_rxq_size);
@@ -4949,7 +4992,9 @@ netdev_dpdk_reconfigure(struct netdev *netdev)
&& dev->txq_size == dev->requested_txq_size
&& eth_addr_equals(dev->hwaddr, dev->requested_hwaddr)
&& dev->socket_id == dev->requested_socket_id
- && dev->started && !dev->reset_needed) {
+ && dev->started && !dev->reset_needed
+ && dev->rxq_steer_params != NULL
+ && dev->rxq_isolate == dev->requested_rxq_isolate) {
/* Reconfiguration is unnecessary */
goto out;
@@ -5015,6 +5060,25 @@ netdev_dpdk_reconfigure(struct netdev *netdev)
err = ENOMEM;
}
+ /*rxq-steer-params
+ * eg. rxq-steer-params:"0|aa:bb:cc:dd:ee:ff,1|aa:bb:cc:dd:eb:ed"
+ * rxq_steer_params:"<q_no_1>:<mac_addr_1>,<q_no_2>:<mac_addr_2>"
+ */
+
+ if (dev->rxq_steer_params == NULL) {
+ char *flow_steer_params =
+ nullable_xstrdup(dev->requested_rxq_steer_params);
+ if (flow_steer_params) {
+ parse_flow_steer_params(flow_steer_params, dev);
+ }
+ free(flow_steer_params);
+ dev->rxq_steer_params = dev->requested_rxq_steer_params;
+ }
+
+ if (dev->rxq_isolate != dev->requested_rxq_isolate) {
+ configure_rxq_isolate(dev, dev->requested_rxq_isolate);
+ dev->rxq_isolate = dev->requested_rxq_isolate;
+ }
netdev_change_seq_changed(netdev);
out:
@@ -5531,3 +5595,147 @@ netdev_dpdk_register(const struct smap *ovs_other_config)
netdev_register_provider(&dpdk_vhost_class);
netdev_register_provider(&dpdk_vhost_client_class);
}
+
+static int
+parse_flow_steer_params(char *flow_steer_params, struct netdev_dpdk *dev) {
+ char *next_rule_ptr = NULL;
+ char *rule = NULL;
+ struct rte_flow *flow;
+
+ /*Clear all existing flows before pushing anythin new*/
+ flush_steering_flows(dev);
+ while ((rule = strtok_r(flow_steer_params, ",", &next_rule_ptr))) {
+ char *rule_attribute = NULL;
+ char *next_attribute_ptr = NULL;
+ uint32_t rule_attribute_count = 0;
+ uint16_t q_no;
+ struct eth_addr mac_addr;
+
+ while ((rule_attribute = strtok_r(rule, "|", &next_attribute_ptr))) {
+ switch (rule_attribute_count) {
+ case 0:
+ q_no = strtol(rule_attribute, NULL, 10);
+ break;
+ case 1: ;
+ if (!eth_addr_from_string(rule_attribute, &mac_addr)) {
+ VLOG_ERR("Malformed rxq flow steering mac address"
+ " passed for %s", dev->up.name);
+ }
+ break;
+ default:
+ VLOG_ERR("More than 2 rxq flow steering arguments"
+ " passed for %s", dev->up.name);
+ }
+ rule_attribute_count++;
+ rule = NULL;
+
+ }
+ /*Malformed or no rule set*/
+ if (rule_attribute_count != 2) {
+ break;
+ }
+
+ flow = generate_eth_flow( (uint16_t) dev->port_id, q_no, &mac_addr);
+ if (!flow) {
+ VLOG_INFO("Error adding flow %s:%d:"ETH_ADDR_FMT, dev->up.name,
+ q_no, ETH_ADDR_ARGS(mac_addr));
+ } else {
+ VLOG_INFO("Flow successfully added %s:%d:"ETH_ADDR_FMT,
+ dev->up.name,
+ q_no, ETH_ADDR_ARGS(mac_addr));
+ dev->steering_flows[dev->flow_count++] = flow;
+ }
+
+ if (dev->flow_count >= RXQ_STEERING_FLOWS_MAX) {
+ VLOG_ERR("Steering flow limit reached, Ignoring other flows");
+ break;
+ }
+ flow_steer_params = NULL;
+ }
+ return 0;
+}
+
+
+static int
+flush_steering_flows(struct netdev_dpdk *dev) {
+ struct rte_flow_error error;
+ int i;
+ for (i = 0; i < dev->flow_count; i++) {
+ if (rte_flow_destroy (dev->port_id, dev->steering_flows[i], &error)) {
+ VLOG_ERR("Flow flush failed for interface %s with %s\n",
+ dev->up.name, error.message);
+ }
+ }
+ VLOG_INFO("Flushed %d flows for interface %s", i, dev->up.name);
+ dev->flow_count = 0;
+ return 0;
+}
+
+static struct rte_flow*
+generate_eth_flow(uint16_t port_id, short rx_q, struct eth_addr *dst_addr)
+{
+ struct rte_flow *flow = NULL;
+ struct rte_flow_error error;
+
+ struct rte_flow_item_eth eth = {
+ .type = RTE_BE16(0x0800),
+ };
+ memcpy(eth.dst.addr_bytes, dst_addr->ea, ETH_ADDR_LEN);
+
+ static const struct rte_flow_item_eth eth_mask = {
+ .dst.addr_bytes = {0xff,0xff,0xff,0xff,0xff,0xff},
+ .src.addr_bytes = {0x00,0x00,0x00,0x00,0x00,0x00},
+ .type = RTE_BE16(0xffff),
+ };
+
+ struct rte_flow_attr attr = {
+ .group = 0,
+ .priority = 0,
+ .ingress = 1,
+ .egress = 0,
+ .transfer = 0,
+ };
+
+
+ struct rte_flow_item pattern[] = {
+ {
+ .type = RTE_FLOW_ITEM_TYPE_ETH,
+ .spec = ð,
+ .mask = ð_mask,
+ },
+ { .type = RTE_FLOW_ITEM_TYPE_END },
+ };
+
+ const struct rte_flow_action actions[] = {
+ {
+ .type = RTE_FLOW_ACTION_TYPE_QUEUE,
+ .conf = &(const struct rte_flow_action_queue) {
+ .index = rx_q,
+ },
+ },
+ { .type = RTE_FLOW_ACTION_TYPE_END },
+ };
+
+ if (rte_flow_validate(port_id, &attr, pattern, actions, &error)) {
+ VLOG_ERR("Flow validation failed %s", error.message);
+ return NULL;
+ }
+ flow = rte_flow_create(port_id, &attr, pattern, actions, &error);
+ if (!flow) {
+ VLOG_ERR("Flow creation failed %s", error.message);
+ return NULL;
+ }
+ return flow;
+}
+
+static void
+configure_rxq_isolate(struct netdev_dpdk *dev, bool rxq_isolate) {
+ struct rte_flow_error error;
+
+ if (rte_flow_isolate(dev->port_id, rxq_isolate, &error)) {
+ VLOG_ERR("RXQ isolation failed for %s with error %s\n", dev->up.name,
+ error.message);
+ return;
+ }
+ VLOG_INFO("RXQ isolation set for %s", dev->up.name);
+}