diff mbox

[ovs-dev,v5,4/4] dpif-netdev: Introduce pmd-rxq-affinity.

Message ID 1469630684-9950-5-git-send-email-i.maximets@samsung.com
State Accepted
Delegated to: Daniele Di Proietto
Headers show

Commit Message

Ilya Maximets July 27, 2016, 2:44 p.m. UTC
New 'other_config:pmd-rxq-affinity' field for Interface table to
perform manual pinning of RX queues to desired cores.

This functionality is required to achieve maximum performance because
all kinds of ports have different cost of rx/tx operations and
only user can know about expected workload on different ports.

Example:
	# ./bin/ovs-vsctl set interface dpdk0 options:n_rxq=4 \
	                  other_config:pmd-rxq-affinity="0:3,1:7,3:8"
	Queue #0 pinned to core 3;
	Queue #1 pinned to core 7;
	Queue #2 not pinned.
	Queue #3 pinned to core 8;

It's decided to automatically isolate cores that have rxq explicitly
assigned to them because it's useful to keep constant polling rate on
some performance critical ports while adding/deleting other ports
without explicit pinning of all ports.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 INSTALL.DPDK.md      |  49 +++++++++++-
 NEWS                 |   2 +
 lib/dpif-netdev.c    | 216 +++++++++++++++++++++++++++++++++++++++++----------
 tests/pmd.at         |   6 ++
 vswitchd/vswitch.xml |  23 ++++++
 5 files changed, 254 insertions(+), 42 deletions(-)

Comments

Daniele Di Proietto July 27, 2016, 8 p.m. UTC | #1
ofputil_parse_key_value() to parse the affinity seems like a good idea, thanks!


I got a compiler warning on an unused variable.  I fixed that and applied the series to master.

Thanks,

Daniele



On 27/07/2016 07:44, "Ilya Maximets" <i.maximets@samsung.com> wrote:

>New 'other_config:pmd-rxq-affinity' field for Interface table to

>perform manual pinning of RX queues to desired cores.

>

>This functionality is required to achieve maximum performance because

>all kinds of ports have different cost of rx/tx operations and

>only user can know about expected workload on different ports.

>

>Example:

>	# ./bin/ovs-vsctl set interface dpdk0 options:n_rxq=4 \

>	                  other_config:pmd-rxq-affinity="0:3,1:7,3:8"

>	Queue #0 pinned to core 3;

>	Queue #1 pinned to core 7;

>	Queue #2 not pinned.

>	Queue #3 pinned to core 8;

>

>It's decided to automatically isolate cores that have rxq explicitly

>assigned to them because it's useful to keep constant polling rate on

>some performance critical ports while adding/deleting other ports

>without explicit pinning of all ports.

>

>Signed-off-by: Ilya Maximets <i.maximets@samsung.com>

>---

> INSTALL.DPDK.md      |  49 +++++++++++-

> NEWS                 |   2 +

> lib/dpif-netdev.c    | 216 +++++++++++++++++++++++++++++++++++++++++----------

> tests/pmd.at         |   6 ++

> vswitchd/vswitch.xml |  23 ++++++

> 5 files changed, 254 insertions(+), 42 deletions(-)

>

>diff --git a/INSTALL.DPDK.md b/INSTALL.DPDK.md

>index 5407794..7609aa7 100644

>--- a/INSTALL.DPDK.md

>+++ b/INSTALL.DPDK.md

>@@ -289,14 +289,57 @@ advanced install guide [INSTALL.DPDK-ADVANCED.md]

>      # Check current stats

>        ovs-appctl dpif-netdev/pmd-stats-show

> 

>+     # Clear previous stats

>+       ovs-appctl dpif-netdev/pmd-stats-clear

>+     ```

>+

>+  7. Port/rxq assigment to PMD threads

>+

>+     ```

>      # Show port/rxq assignment

>        ovs-appctl dpif-netdev/pmd-rxq-show

>+     ```

> 

>-     # Clear previous stats

>-       ovs-appctl dpif-netdev/pmd-stats-clear

>+     To change default rxq assignment to pmd threads rxqs may be manually

>+     pinned to desired cores using:

>+

>+     ```

>+     ovs-vsctl set Interface <iface> \

>+               other_config:pmd-rxq-affinity=<rxq-affinity-list>

>      ```

>+     where:

>+

>+     ```

>+     <rxq-affinity-list> ::= NULL | <non-empty-list>

>+     <non-empty-list> ::= <affinity-pair> |

>+                          <affinity-pair> , <non-empty-list>

>+     <affinity-pair> ::= <queue-id> : <core-id>

>+     ```

>+

>+     Example:

>+

>+     ```

>+     ovs-vsctl set interface dpdk0 options:n_rxq=4 \

>+               other_config:pmd-rxq-affinity="0:3,1:7,3:8"

>+

>+     Queue #0 pinned to core 3;

>+     Queue #1 pinned to core 7;

>+     Queue #2 not pinned.

>+     Queue #3 pinned to core 8;

>+     ```

>+

>+     After that PMD threads on cores where RX queues was pinned will become

>+     `isolated`. This means that this thread will poll only pinned RX queues.

>+

>+     WARNING: If there are no `non-isolated` PMD threads, `non-pinned` RX queues

>+     will not be polled. Also, if provided `core_id` is not available (ex. this

>+     `core_id` not in `pmd-cpu-mask`), RX queue will not be polled by any

>+     PMD thread.

>+

>+     Isolation of PMD threads also can be checked using

>+     `ovs-appctl dpif-netdev/pmd-rxq-show` command.

> 

>-  7. Stop vswitchd & Delete bridge

>+  8. Stop vswitchd & Delete bridge

> 

>      ```

>      ovs-appctl -t ovs-vswitchd exit

>diff --git a/NEWS b/NEWS

>index 73d3fcf..1a34f75 100644

>--- a/NEWS

>+++ b/NEWS

>@@ -45,6 +45,8 @@ Post-v2.5.0

>        Old 'other_config:n-dpdk-rxqs' is no longer supported.

>        Not supported by vHost interfaces. For them number of rx and tx queues

>        is applied from connected virtio device.

>+     * New 'other_config:pmd-rxq-affinity' field for PMD interfaces, that

>+       allows to pin port's rx queues to desired cores.

>      * New appctl command 'dpif-netdev/pmd-rxq-show' to check the port/rxq

>        assignment.

>      * Type of log messages from PMD threads changed from INFO to DBG.

>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c

>index 1ef0cd7..33f1216 100644

>--- a/lib/dpif-netdev.c

>+++ b/lib/dpif-netdev.c

>@@ -53,7 +53,9 @@

> #include "openvswitch/list.h"

> #include "openvswitch/match.h"

> #include "openvswitch/ofp-print.h"

>+#include "openvswitch/ofp-util.h"

> #include "openvswitch/ofpbuf.h"

>+#include "openvswitch/shash.h"

> #include "openvswitch/vlog.h"

> #include "ovs-numa.h"

> #include "ovs-rcu.h"

>@@ -62,7 +64,7 @@

> #include "pvector.h"

> #include "random.h"

> #include "seq.h"

>-#include "openvswitch/shash.h"

>+#include "smap.h"

> #include "sset.h"

> #include "timeval.h"

> #include "tnl-neigh-cache.h"

>@@ -252,6 +254,12 @@ enum pmd_cycles_counter_type {

> 

> #define XPS_TIMEOUT_MS 500LL

> 

>+/* Contained by struct dp_netdev_port's 'rxqs' member.  */

>+struct dp_netdev_rxq {

>+    struct netdev_rxq *rxq;

>+    unsigned core_id;           /* Сore to which this queue is pinned. */

>+};

>+

> /* A port in a netdev-based datapath. */

> struct dp_netdev_port {

>     odp_port_t port_no;

>@@ -259,11 +267,12 @@ struct dp_netdev_port {

>     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */

>     struct netdev_saved_flags *sf;

>     unsigned n_rxq;             /* Number of elements in 'rxq' */

>-    struct netdev_rxq **rxq;

>+    struct dp_netdev_rxq *rxqs;

>     atomic_bool dynamic_txqs;   /* If true XPS will be used. */

>     unsigned *txq_used;         /* Number of threads that uses each tx queue. */

>     struct ovs_mutex txq_used_mutex;

>     char *type;                 /* Port type as requested by user. */

>+    char *rxq_affinity_list;    /* Requested affinity of rx queues. */

> };

> 

> /* Contained by struct dp_netdev_flow's 'stats' member.  */

>@@ -450,6 +459,7 @@ struct dp_netdev_pmd_thread {

>     pthread_t thread;

>     unsigned core_id;               /* CPU core id of this pmd thread. */

>     int numa_id;                    /* numa node id of this pmd thread. */

>+    bool isolated;

> 

>     /* Queue id used by this pmd thread to send packets on all netdevs if

>      * XPS disabled for this netdev. All static_tx_qid's are unique and less

>@@ -545,6 +555,8 @@ static struct dp_netdev_pmd_thread *

> dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);

> static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)

>     OVS_REQUIRES(dp->port_mutex);

>+static void reconfigure_pmd_threads(struct dp_netdev *dp)

>+    OVS_REQUIRES(dp->port_mutex);

> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);

> static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);

> static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);

>@@ -735,8 +747,10 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)

>         struct rxq_poll *poll;

>         const char *prev_name = NULL;

> 

>-        ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",

>-                      pmd->numa_id, pmd->core_id);

>+        ds_put_format(reply,

>+                      "pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",

>+                      pmd->numa_id, pmd->core_id, (pmd->isolated)

>+                                                  ? "true" : "false");

> 

>         ovs_mutex_lock(&pmd->port_mutex);

>         LIST_FOR_EACH (poll, node, &pmd->poll_list) {

>@@ -1221,19 +1235,20 @@ port_create(const char *devname, const char *open_type, const char *type,

>     port->port_no = port_no;

>     port->netdev = netdev;

>     port->n_rxq = netdev_n_rxq(netdev);

>-    port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);

>+    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);

>     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);

>     port->type = xstrdup(type);

>     ovs_mutex_init(&port->txq_used_mutex);

>     atomic_init(&port->dynamic_txqs, dynamic_txqs);

> 

>     for (i = 0; i < port->n_rxq; i++) {

>-        error = netdev_rxq_open(netdev, &port->rxq[i], i);

>+        error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);

>         if (error) {

>             VLOG_ERR("%s: cannot receive packets on this network device (%s)",

>                      devname, ovs_strerror(errno));

>             goto out_rxq_close;

>         }

>+        port->rxqs[i].core_id = -1;

>         n_open_rxqs++;

>     }

> 

>@@ -1249,12 +1264,12 @@ port_create(const char *devname, const char *open_type, const char *type,

> 

> out_rxq_close:

>     for (i = 0; i < n_open_rxqs; i++) {

>-        netdev_rxq_close(port->rxq[i]);

>+        netdev_rxq_close(port->rxqs[i].rxq);

>     }

>     ovs_mutex_destroy(&port->txq_used_mutex);

>     free(port->type);

>     free(port->txq_used);

>-    free(port->rxq);

>+    free(port->rxqs);

>     free(port);

> 

> out:

>@@ -1391,11 +1406,12 @@ port_destroy(struct dp_netdev_port *port)

>     netdev_restore_flags(port->sf);

> 

>     for (unsigned i = 0; i < port->n_rxq; i++) {

>-        netdev_rxq_close(port->rxq[i]);

>+        netdev_rxq_close(port->rxqs[i].rxq);

>     }

>     ovs_mutex_destroy(&port->txq_used_mutex);

>+    free(port->rxq_affinity_list);

>     free(port->txq_used);

>-    free(port->rxq);

>+    free(port->rxqs);

>     free(port->type);

>     free(port);

> }

>@@ -2573,6 +2589,96 @@ dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)

>     return 0;

> }

> 

>+/* Parses affinity list and returns result in 'core_ids'. */

>+static int

>+parse_affinity_list(const char *affinity_list, unsigned *core_ids, int n_rxq)

>+{

>+    unsigned i;

>+    char *list, *copy, *key, *value;

>+    int error = 0;

>+

>+    for (i = 0; i < n_rxq; i++) {

>+        core_ids[i] = -1;

>+    }

>+

>+    if (!affinity_list) {

>+        return 0;

>+    }

>+

>+    list = copy = xstrdup(affinity_list);

>+

>+    while (ofputil_parse_key_value(&list, &key, &value)) {

>+        int rxq_id, core_id;

>+

>+        if (!str_to_int(key, 0, &rxq_id) || rxq_id < 0

>+            || !str_to_int(value, 0, &core_id) || core_id < 0) {

>+            error = EINVAL;

>+            break;

>+        }

>+

>+        if (rxq_id < n_rxq) {

>+            core_ids[rxq_id] = core_id;

>+        }

>+    }

>+

>+    free(copy);

>+    return error;

>+}

>+

>+/* Parses 'affinity_list' and applies configuration if it is valid. */

>+static int

>+dpif_netdev_port_set_rxq_affinity(struct dp_netdev_port *port,

>+                                  const char *affinity_list)

>+{

>+    unsigned *core_ids, i;

>+    int error = 0;

>+

>+    core_ids = xmalloc(port->n_rxq * sizeof *core_ids);

>+    if (parse_affinity_list(affinity_list, core_ids, port->n_rxq)) {

>+        error = EINVAL;

>+        goto exit;

>+    }

>+

>+    for (i = 0; i < port->n_rxq; i++) {

>+        port->rxqs[i].core_id = core_ids[i];

>+    }

>+

>+exit:

>+    free(core_ids);

>+    return error;

>+}

>+

>+/* Changes the affinity of port's rx queues.  The changes are actually applied

>+ * in dpif_netdev_run(). */

>+static int

>+dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,

>+                            const struct smap *cfg)

>+{

>+    struct dp_netdev *dp = get_dp_netdev(dpif);

>+    struct dp_netdev_port *port;

>+    int error = 0;

>+    const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");

>+

>+    ovs_mutex_lock(&dp->port_mutex);

>+    error = get_port_by_number(dp, port_no, &port);

>+    if (error || !netdev_is_pmd(port->netdev)

>+        || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {

>+        goto unlock;

>+    }

>+

>+    error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);

>+    if (error) {

>+        goto unlock;

>+    }

>+    free(port->rxq_affinity_list);

>+    port->rxq_affinity_list = nullable_xstrdup(affinity_list);

>+

>+    dp_netdev_request_reconfigure(dp);

>+unlock:

>+    ovs_mutex_unlock(&dp->port_mutex);

>+    return error;

>+}

>+

> static int

> dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,

>                               uint32_t queue_id, uint32_t *priority)

>@@ -2672,7 +2778,7 @@ static int

> port_reconfigure(struct dp_netdev_port *port)

> {

>     struct netdev *netdev = port->netdev;

>-    int i, err;

>+    int i, err, old_n_rxq;

> 

>     if (!netdev_is_reconf_required(netdev)) {

>         return 0;

>@@ -2680,9 +2786,10 @@ port_reconfigure(struct dp_netdev_port *port)

> 

>     /* Closes the existing 'rxq's. */

>     for (i = 0; i < port->n_rxq; i++) {

>-        netdev_rxq_close(port->rxq[i]);

>-        port->rxq[i] = NULL;

>+        netdev_rxq_close(port->rxqs[i].rxq);

>+        port->rxqs[i].rxq = NULL;

>     }

>+    old_n_rxq = port->n_rxq;

>     port->n_rxq = 0;

> 

>     /* Allows 'netdev' to apply the pending configuration changes. */

>@@ -2693,19 +2800,23 @@ port_reconfigure(struct dp_netdev_port *port)

>         return err;

>     }

>     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */

>-    port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));

>+    port->rxqs = xrealloc(port->rxqs,

>+                          sizeof *port->rxqs * netdev_n_rxq(netdev));

>     /* Realloc 'used' counters for tx queues. */

>     free(port->txq_used);

>     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);

> 

>     for (i = 0; i < netdev_n_rxq(netdev); i++) {

>-        err = netdev_rxq_open(netdev, &port->rxq[i], i);

>+        err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);

>         if (err) {

>             return err;

>         }

>         port->n_rxq++;

>     }

> 

>+    /* Parse affinity list to apply configuration for new queues. */

>+    dpif_netdev_port_set_rxq_affinity(port, port->rxq_affinity_list);

>+

>     return 0;

> }

> 

>@@ -2781,7 +2892,7 @@ dpif_netdev_run(struct dpif *dpif)

>             int i;

> 

>             for (i = 0; i < port->n_rxq; i++) {

>-                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);

>+                dp_netdev_process_rxq_port(non_pmd, port, port->rxqs[i].rxq);

>             }

>         }

>     }

>@@ -2820,7 +2931,7 @@ dpif_netdev_wait(struct dpif *dpif)

>             int i;

> 

>             for (i = 0; i < port->n_rxq; i++) {

>-                netdev_rxq_wait(port->rxq[i]);

>+                netdev_rxq_wait(port->rxqs[i].rxq);

>             }

>         }

>     }

>@@ -3321,9 +3432,9 @@ dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,

> }

> 

> 

>-/* Returns PMD thread from this numa node with fewer rx queues to poll.

>- * Returns NULL if there is no PMD threads on this numa node.

>- * Can be called safely only by main thread. */

>+/* Returns non-isolated PMD thread from this numa node with fewer

>+ * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads

>+ * on this numa node. Can be called safely only by main thread. */

> static struct dp_netdev_pmd_thread *

> dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)

> {

>@@ -3331,7 +3442,7 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)

>     struct dp_netdev_pmd_thread *pmd, *res = NULL;

> 

>     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {

>-        if (pmd->numa_id == numa_id

>+        if (!pmd->isolated && pmd->numa_id == numa_id

>             && (min_cnt > pmd->poll_cnt || res == NULL)) {

>             min_cnt = pmd->poll_cnt;

>             res = pmd;

>@@ -3372,14 +3483,16 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,

>     ovs_mutex_unlock(&pmd->port_mutex);

> }

> 

>-/* Distribute all rx queues of 'port' between PMD threads in 'dp'. The pmd

>- * threads that need to be restarted are inserted in 'to_reload'. */

>+/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD

>+ * threads in 'dp'. The pmd threads that need to be restarted are inserted

>+ * in 'to_reload'. PMD threads with pinned queues marked as isolated. */

> static void

> dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,

>                               struct dp_netdev_port *port,

>-                              struct hmapx *to_reload)

>+                              struct hmapx *to_reload, bool pinned)

> {

>     int numa_id = netdev_get_numa_id(port->netdev);

>+    struct dp_netdev_pmd_thread *pmd;

>     int i;

> 

>     if (!netdev_is_pmd(port->netdev)) {

>@@ -3387,32 +3500,50 @@ dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,

>     }

> 

>     for (i = 0; i < port->n_rxq; i++) {

>-        struct dp_netdev_pmd_thread *pmd;

>-

>-        pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);

>-        if (!pmd) {

>-            VLOG_WARN("There's no pmd thread on numa node %d", numa_id);

>-            break;

>+        if (pinned) {

>+            if (port->rxqs[i].core_id == -1) {

>+                continue;

>+            }

>+            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);

>+            if (!pmd) {

>+                VLOG_WARN("There is no PMD thread on core %d. "

>+                          "Queue %d on port \'%s\' will not be polled.",

>+                          port->rxqs[i].core_id, i,

>+                          netdev_get_name(port->netdev));

>+                continue;

>+            }

>+            pmd->isolated = true;

>+            dp_netdev_pmd_unref(pmd);

>+        } else {

>+            if (port->rxqs[i].core_id != -1) {

>+                continue;

>+            }

>+            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);

>+            if (!pmd) {

>+                VLOG_WARN("There's no available pmd thread on numa node %d",

>+                          numa_id);

>+                break;

>+            }

>         }

> 

>         ovs_mutex_lock(&pmd->port_mutex);

>-        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);

>+        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);

>         ovs_mutex_unlock(&pmd->port_mutex);

> 

>         hmapx_add(to_reload, pmd);

>     }

> }

> 

>-/* Distributes all rx queues of 'port' between all PMD threads in 'dp' and

>- * inserts 'port' in the PMD threads 'tx_ports'. The pmd threads that need to

>- * be restarted are inserted in 'to_reload'. */

>+/* Distributes all non-pinned rx queues of 'port' between all PMD threads

>+ * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads

>+ * that need to be restarted are inserted in 'to_reload'. */

> static void

> dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,

>                              struct hmapx *to_reload)

> {

>     struct dp_netdev_pmd_thread *pmd;

> 

>-    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload);

>+    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);

> 

>     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {

>         dp_netdev_add_port_tx_to_pmd(pmd, port);

>@@ -3420,8 +3551,9 @@ dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,

>     }

> }

> 

>-/* Distributes all rx queues of 'port' between all PMD threads in 'dp', inserts

>- * 'port' in the PMD threads 'tx_ports' and reloads them, if needed. */

>+/* Distributes all non-pinned rx queues of 'port' between all PMD threads

>+ * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,

>+ * if needed. */

> static void

> dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)

> {

>@@ -3506,7 +3638,13 @@ dp_netdev_reset_pmd_threads(struct dp_netdev *dp)

> 

>             dp_netdev_set_pmds_on_numa(dp, numa_id);

>         }

>-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload);

>+        /* Distribute only pinned rx queues first to mark threads as isolated */

>+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);

>+    }

>+

>+    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */

>+    HMAP_FOR_EACH (port, node, &dp->ports) {

>+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);

>     }

> 

>     HMAPX_FOR_EACH (node, &to_reload) {

>@@ -4366,7 +4504,7 @@ const struct dpif_class dpif_netdev_class = {

>     dpif_netdev_get_stats,

>     dpif_netdev_port_add,

>     dpif_netdev_port_del,

>-    NULL,                       /* port_set_config */

>+    dpif_netdev_port_set_config,

>     dpif_netdev_port_query_by_number,

>     dpif_netdev_port_query_by_name,

>     NULL,                       /* port_get_pid */

>diff --git a/tests/pmd.at b/tests/pmd.at

>index 3216762..47639b6 100644

>--- a/tests/pmd.at

>+++ b/tests/pmd.at

>@@ -63,6 +63,7 @@ CHECK_PMD_THREADS_CREATED()

> 

> AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 0

> ])

> 

>@@ -93,6 +94,7 @@ dummy@ovs-dummy: hit:0 missed:0

> 

> AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 0 1 2 3 4 5 6 7

> ])

> 

>@@ -116,6 +118,7 @@ dummy@ovs-dummy: hit:0 missed:0

> 

> AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 0 1 2 3 4 5 6 7

> ])

> 

>@@ -125,8 +128,10 @@ CHECK_PMD_THREADS_CREATED([2], [], [+$TMP])

> 

> AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 0 2 4 6

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 1 3 5 7

> ])

> 

>@@ -136,6 +141,7 @@ CHECK_PMD_THREADS_CREATED([1], [], [+$TMP])

> 

> AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl

> pmd thread numa_id <cleared> core_id <cleared>:

>+	isolated : false

> 	port: p0	queue-id: 0 1 2 3 4 5 6 7

> ])

> 

>diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml

>index fed6f56..f0e1381 100644

>--- a/vswitchd/vswitch.xml

>+++ b/vswitchd/vswitch.xml

>@@ -2355,6 +2355,29 @@

>           Not supported by DPDK vHost interfaces.

>         </p>

>       </column>

>+

>+      <column name="other_config" key="pmd-rxq-affinity">

>+        <p>Specifies mapping of RX queues of this interface to CPU cores.</p>

>+        <p>Value should be set in the following form:</p>

>+        <p>

>+          <code>other_config:pmd-rxq-affinity=&lt;rxq-affinity-list&gt;</code>

>+        </p>

>+        <p>where</p>

>+        <p>

>+          <ul>

>+            <li>

>+              &lt;rxq-affinity-list&gt; ::= NULL | &lt;non-empty-list&gt;

>+            </li>

>+            <li>

>+              &lt;non-empty-list&gt; ::= &lt;affinity-pair&gt; |

>+                                  &lt;affinity-pair&gt; , &lt;non-empty-list&gt;

>+            </li>

>+            <li>

>+              &lt;affinity-pair&gt; ::= &lt;queue-id&gt; : &lt;core-id&gt;

>+            </li>

>+          </ul>

>+        </p>

>+      </column>

>     </group>

> 

>     <group title="Interface Status">

>-- 

>2.7.4
diff mbox

Patch

diff --git a/INSTALL.DPDK.md b/INSTALL.DPDK.md
index 5407794..7609aa7 100644
--- a/INSTALL.DPDK.md
+++ b/INSTALL.DPDK.md
@@ -289,14 +289,57 @@  advanced install guide [INSTALL.DPDK-ADVANCED.md]
      # Check current stats
        ovs-appctl dpif-netdev/pmd-stats-show
 
+     # Clear previous stats
+       ovs-appctl dpif-netdev/pmd-stats-clear
+     ```
+
+  7. Port/rxq assigment to PMD threads
+
+     ```
      # Show port/rxq assignment
        ovs-appctl dpif-netdev/pmd-rxq-show
+     ```
 
-     # Clear previous stats
-       ovs-appctl dpif-netdev/pmd-stats-clear
+     To change default rxq assignment to pmd threads rxqs may be manually
+     pinned to desired cores using:
+
+     ```
+     ovs-vsctl set Interface <iface> \
+               other_config:pmd-rxq-affinity=<rxq-affinity-list>
      ```
+     where:
+
+     ```
+     <rxq-affinity-list> ::= NULL | <non-empty-list>
+     <non-empty-list> ::= <affinity-pair> |
+                          <affinity-pair> , <non-empty-list>
+     <affinity-pair> ::= <queue-id> : <core-id>
+     ```
+
+     Example:
+
+     ```
+     ovs-vsctl set interface dpdk0 options:n_rxq=4 \
+               other_config:pmd-rxq-affinity="0:3,1:7,3:8"
+
+     Queue #0 pinned to core 3;
+     Queue #1 pinned to core 7;
+     Queue #2 not pinned.
+     Queue #3 pinned to core 8;
+     ```
+
+     After that PMD threads on cores where RX queues was pinned will become
+     `isolated`. This means that this thread will poll only pinned RX queues.
+
+     WARNING: If there are no `non-isolated` PMD threads, `non-pinned` RX queues
+     will not be polled. Also, if provided `core_id` is not available (ex. this
+     `core_id` not in `pmd-cpu-mask`), RX queue will not be polled by any
+     PMD thread.
+
+     Isolation of PMD threads also can be checked using
+     `ovs-appctl dpif-netdev/pmd-rxq-show` command.
 
-  7. Stop vswitchd & Delete bridge
+  8. Stop vswitchd & Delete bridge
 
      ```
      ovs-appctl -t ovs-vswitchd exit
diff --git a/NEWS b/NEWS
index 73d3fcf..1a34f75 100644
--- a/NEWS
+++ b/NEWS
@@ -45,6 +45,8 @@  Post-v2.5.0
        Old 'other_config:n-dpdk-rxqs' is no longer supported.
        Not supported by vHost interfaces. For them number of rx and tx queues
        is applied from connected virtio device.
+     * New 'other_config:pmd-rxq-affinity' field for PMD interfaces, that
+       allows to pin port's rx queues to desired cores.
      * New appctl command 'dpif-netdev/pmd-rxq-show' to check the port/rxq
        assignment.
      * Type of log messages from PMD threads changed from INFO to DBG.
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 1ef0cd7..33f1216 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -53,7 +53,9 @@ 
 #include "openvswitch/list.h"
 #include "openvswitch/match.h"
 #include "openvswitch/ofp-print.h"
+#include "openvswitch/ofp-util.h"
 #include "openvswitch/ofpbuf.h"
+#include "openvswitch/shash.h"
 #include "openvswitch/vlog.h"
 #include "ovs-numa.h"
 #include "ovs-rcu.h"
@@ -62,7 +64,7 @@ 
 #include "pvector.h"
 #include "random.h"
 #include "seq.h"
-#include "openvswitch/shash.h"
+#include "smap.h"
 #include "sset.h"
 #include "timeval.h"
 #include "tnl-neigh-cache.h"
@@ -252,6 +254,12 @@  enum pmd_cycles_counter_type {
 
 #define XPS_TIMEOUT_MS 500LL
 
+/* Contained by struct dp_netdev_port's 'rxqs' member.  */
+struct dp_netdev_rxq {
+    struct netdev_rxq *rxq;
+    unsigned core_id;           /* Сore to which this queue is pinned. */
+};
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -259,11 +267,12 @@  struct dp_netdev_port {
     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
     struct netdev_saved_flags *sf;
     unsigned n_rxq;             /* Number of elements in 'rxq' */
-    struct netdev_rxq **rxq;
+    struct dp_netdev_rxq *rxqs;
     atomic_bool dynamic_txqs;   /* If true XPS will be used. */
     unsigned *txq_used;         /* Number of threads that uses each tx queue. */
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
+    char *rxq_affinity_list;    /* Requested affinity of rx queues. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -450,6 +459,7 @@  struct dp_netdev_pmd_thread {
     pthread_t thread;
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
+    bool isolated;
 
     /* Queue id used by this pmd thread to send packets on all netdevs if
      * XPS disabled for this netdev. All static_tx_qid's are unique and less
@@ -545,6 +555,8 @@  static struct dp_netdev_pmd_thread *
 dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
+static void reconfigure_pmd_threads(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
@@ -735,8 +747,10 @@  pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
         struct rxq_poll *poll;
         const char *prev_name = NULL;
 
-        ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",
-                      pmd->numa_id, pmd->core_id);
+        ds_put_format(reply,
+                      "pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
+                      pmd->numa_id, pmd->core_id, (pmd->isolated)
+                                                  ? "true" : "false");
 
         ovs_mutex_lock(&pmd->port_mutex);
         LIST_FOR_EACH (poll, node, &pmd->poll_list) {
@@ -1221,19 +1235,20 @@  port_create(const char *devname, const char *open_type, const char *type,
     port->port_no = port_no;
     port->netdev = netdev;
     port->n_rxq = netdev_n_rxq(netdev);
-    port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
+    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
     ovs_mutex_init(&port->txq_used_mutex);
     atomic_init(&port->dynamic_txqs, dynamic_txqs);
 
     for (i = 0; i < port->n_rxq; i++) {
-        error = netdev_rxq_open(netdev, &port->rxq[i], i);
+        error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
         if (error) {
             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
                      devname, ovs_strerror(errno));
             goto out_rxq_close;
         }
+        port->rxqs[i].core_id = -1;
         n_open_rxqs++;
     }
 
@@ -1249,12 +1264,12 @@  port_create(const char *devname, const char *open_type, const char *type,
 
 out_rxq_close:
     for (i = 0; i < n_open_rxqs; i++) {
-        netdev_rxq_close(port->rxq[i]);
+        netdev_rxq_close(port->rxqs[i].rxq);
     }
     ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->type);
     free(port->txq_used);
-    free(port->rxq);
+    free(port->rxqs);
     free(port);
 
 out:
@@ -1391,11 +1406,12 @@  port_destroy(struct dp_netdev_port *port)
     netdev_restore_flags(port->sf);
 
     for (unsigned i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxq[i]);
+        netdev_rxq_close(port->rxqs[i].rxq);
     }
     ovs_mutex_destroy(&port->txq_used_mutex);
+    free(port->rxq_affinity_list);
     free(port->txq_used);
-    free(port->rxq);
+    free(port->rxqs);
     free(port->type);
     free(port);
 }
@@ -2573,6 +2589,96 @@  dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
     return 0;
 }
 
+/* Parses affinity list and returns result in 'core_ids'. */
+static int
+parse_affinity_list(const char *affinity_list, unsigned *core_ids, int n_rxq)
+{
+    unsigned i;
+    char *list, *copy, *key, *value;
+    int error = 0;
+
+    for (i = 0; i < n_rxq; i++) {
+        core_ids[i] = -1;
+    }
+
+    if (!affinity_list) {
+        return 0;
+    }
+
+    list = copy = xstrdup(affinity_list);
+
+    while (ofputil_parse_key_value(&list, &key, &value)) {
+        int rxq_id, core_id;
+
+        if (!str_to_int(key, 0, &rxq_id) || rxq_id < 0
+            || !str_to_int(value, 0, &core_id) || core_id < 0) {
+            error = EINVAL;
+            break;
+        }
+
+        if (rxq_id < n_rxq) {
+            core_ids[rxq_id] = core_id;
+        }
+    }
+
+    free(copy);
+    return error;
+}
+
+/* Parses 'affinity_list' and applies configuration if it is valid. */
+static int
+dpif_netdev_port_set_rxq_affinity(struct dp_netdev_port *port,
+                                  const char *affinity_list)
+{
+    unsigned *core_ids, i;
+    int error = 0;
+
+    core_ids = xmalloc(port->n_rxq * sizeof *core_ids);
+    if (parse_affinity_list(affinity_list, core_ids, port->n_rxq)) {
+        error = EINVAL;
+        goto exit;
+    }
+
+    for (i = 0; i < port->n_rxq; i++) {
+        port->rxqs[i].core_id = core_ids[i];
+    }
+
+exit:
+    free(core_ids);
+    return error;
+}
+
+/* Changes the affinity of port's rx queues.  The changes are actually applied
+ * in dpif_netdev_run(). */
+static int
+dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
+                            const struct smap *cfg)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_port *port;
+    int error = 0;
+    const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");
+
+    ovs_mutex_lock(&dp->port_mutex);
+    error = get_port_by_number(dp, port_no, &port);
+    if (error || !netdev_is_pmd(port->netdev)
+        || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+        goto unlock;
+    }
+
+    error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
+    if (error) {
+        goto unlock;
+    }
+    free(port->rxq_affinity_list);
+    port->rxq_affinity_list = nullable_xstrdup(affinity_list);
+
+    dp_netdev_request_reconfigure(dp);
+unlock:
+    ovs_mutex_unlock(&dp->port_mutex);
+    return error;
+}
+
 static int
 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
                               uint32_t queue_id, uint32_t *priority)
@@ -2672,7 +2778,7 @@  static int
 port_reconfigure(struct dp_netdev_port *port)
 {
     struct netdev *netdev = port->netdev;
-    int i, err;
+    int i, err, old_n_rxq;
 
     if (!netdev_is_reconf_required(netdev)) {
         return 0;
@@ -2680,9 +2786,10 @@  port_reconfigure(struct dp_netdev_port *port)
 
     /* Closes the existing 'rxq's. */
     for (i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxq[i]);
-        port->rxq[i] = NULL;
+        netdev_rxq_close(port->rxqs[i].rxq);
+        port->rxqs[i].rxq = NULL;
     }
+    old_n_rxq = port->n_rxq;
     port->n_rxq = 0;
 
     /* Allows 'netdev' to apply the pending configuration changes. */
@@ -2693,19 +2800,23 @@  port_reconfigure(struct dp_netdev_port *port)
         return err;
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
-    port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
+    port->rxqs = xrealloc(port->rxqs,
+                          sizeof *port->rxqs * netdev_n_rxq(netdev));
     /* Realloc 'used' counters for tx queues. */
     free(port->txq_used);
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
 
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
-        err = netdev_rxq_open(netdev, &port->rxq[i], i);
+        err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
         if (err) {
             return err;
         }
         port->n_rxq++;
     }
 
+    /* Parse affinity list to apply configuration for new queues. */
+    dpif_netdev_port_set_rxq_affinity(port, port->rxq_affinity_list);
+
     return 0;
 }
 
@@ -2781,7 +2892,7 @@  dpif_netdev_run(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
+                dp_netdev_process_rxq_port(non_pmd, port, port->rxqs[i].rxq);
             }
         }
     }
@@ -2820,7 +2931,7 @@  dpif_netdev_wait(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                netdev_rxq_wait(port->rxq[i]);
+                netdev_rxq_wait(port->rxqs[i].rxq);
             }
         }
     }
@@ -3321,9 +3432,9 @@  dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
 }
 
 
-/* Returns PMD thread from this numa node with fewer rx queues to poll.
- * Returns NULL if there is no PMD threads on this numa node.
- * Can be called safely only by main thread. */
+/* Returns non-isolated PMD thread from this numa node with fewer
+ * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads
+ * on this numa node. Can be called safely only by main thread. */
 static struct dp_netdev_pmd_thread *
 dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
 {
@@ -3331,7 +3442,7 @@  dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
     struct dp_netdev_pmd_thread *pmd, *res = NULL;
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->numa_id == numa_id
+        if (!pmd->isolated && pmd->numa_id == numa_id
             && (min_cnt > pmd->poll_cnt || res == NULL)) {
             min_cnt = pmd->poll_cnt;
             res = pmd;
@@ -3372,14 +3483,16 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     ovs_mutex_unlock(&pmd->port_mutex);
 }
 
-/* Distribute all rx queues of 'port' between PMD threads in 'dp'. The pmd
- * threads that need to be restarted are inserted in 'to_reload'. */
+/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
+ * threads in 'dp'. The pmd threads that need to be restarted are inserted
+ * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
 static void
 dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
                               struct dp_netdev_port *port,
-                              struct hmapx *to_reload)
+                              struct hmapx *to_reload, bool pinned)
 {
     int numa_id = netdev_get_numa_id(port->netdev);
+    struct dp_netdev_pmd_thread *pmd;
     int i;
 
     if (!netdev_is_pmd(port->netdev)) {
@@ -3387,32 +3500,50 @@  dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
     }
 
     for (i = 0; i < port->n_rxq; i++) {
-        struct dp_netdev_pmd_thread *pmd;
-
-        pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
-        if (!pmd) {
-            VLOG_WARN("There's no pmd thread on numa node %d", numa_id);
-            break;
+        if (pinned) {
+            if (port->rxqs[i].core_id == -1) {
+                continue;
+            }
+            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
+            if (!pmd) {
+                VLOG_WARN("There is no PMD thread on core %d. "
+                          "Queue %d on port \'%s\' will not be polled.",
+                          port->rxqs[i].core_id, i,
+                          netdev_get_name(port->netdev));
+                continue;
+            }
+            pmd->isolated = true;
+            dp_netdev_pmd_unref(pmd);
+        } else {
+            if (port->rxqs[i].core_id != -1) {
+                continue;
+            }
+            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+            if (!pmd) {
+                VLOG_WARN("There's no available pmd thread on numa node %d",
+                          numa_id);
+                break;
+            }
         }
 
         ovs_mutex_lock(&pmd->port_mutex);
-        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
         ovs_mutex_unlock(&pmd->port_mutex);
 
         hmapx_add(to_reload, pmd);
     }
 }
 
-/* Distributes all rx queues of 'port' between all PMD threads in 'dp' and
- * inserts 'port' in the PMD threads 'tx_ports'. The pmd threads that need to
- * be restarted are inserted in 'to_reload'. */
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
+ * that need to be restarted are inserted in 'to_reload'. */
 static void
 dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
                              struct hmapx *to_reload)
 {
     struct dp_netdev_pmd_thread *pmd;
 
-    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload);
+    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         dp_netdev_add_port_tx_to_pmd(pmd, port);
@@ -3420,8 +3551,9 @@  dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
     }
 }
 
-/* Distributes all rx queues of 'port' between all PMD threads in 'dp', inserts
- * 'port' in the PMD threads 'tx_ports' and reloads them, if needed. */
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
+ * if needed. */
 static void
 dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
 {
@@ -3506,7 +3638,13 @@  dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
 
             dp_netdev_set_pmds_on_numa(dp, numa_id);
         }
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload);
+        /* Distribute only pinned rx queues first to mark threads as isolated */
+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
+    }
+
+    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
     }
 
     HMAPX_FOR_EACH (node, &to_reload) {
@@ -4366,7 +4504,7 @@  const struct dpif_class dpif_netdev_class = {
     dpif_netdev_get_stats,
     dpif_netdev_port_add,
     dpif_netdev_port_del,
-    NULL,                       /* port_set_config */
+    dpif_netdev_port_set_config,
     dpif_netdev_port_query_by_number,
     dpif_netdev_port_query_by_name,
     NULL,                       /* port_get_pid */
diff --git a/tests/pmd.at b/tests/pmd.at
index 3216762..47639b6 100644
--- a/tests/pmd.at
+++ b/tests/pmd.at
@@ -63,6 +63,7 @@  CHECK_PMD_THREADS_CREATED()
 
 AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 0
 ])
 
@@ -93,6 +94,7 @@  dummy@ovs-dummy: hit:0 missed:0
 
 AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 0 1 2 3 4 5 6 7
 ])
 
@@ -116,6 +118,7 @@  dummy@ovs-dummy: hit:0 missed:0
 
 AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 0 1 2 3 4 5 6 7
 ])
 
@@ -125,8 +128,10 @@  CHECK_PMD_THREADS_CREATED([2], [], [+$TMP])
 
 AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 0 2 4 6
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 1 3 5 7
 ])
 
@@ -136,6 +141,7 @@  CHECK_PMD_THREADS_CREATED([1], [], [+$TMP])
 
 AT_CHECK([ovs-appctl dpif-netdev/pmd-rxq-show | sed SED_NUMA_CORE_PATTERN], [0], [dnl
 pmd thread numa_id <cleared> core_id <cleared>:
+	isolated : false
 	port: p0	queue-id: 0 1 2 3 4 5 6 7
 ])
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index fed6f56..f0e1381 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -2355,6 +2355,29 @@ 
           Not supported by DPDK vHost interfaces.
         </p>
       </column>
+
+      <column name="other_config" key="pmd-rxq-affinity">
+        <p>Specifies mapping of RX queues of this interface to CPU cores.</p>
+        <p>Value should be set in the following form:</p>
+        <p>
+          <code>other_config:pmd-rxq-affinity=&lt;rxq-affinity-list&gt;</code>
+        </p>
+        <p>where</p>
+        <p>
+          <ul>
+            <li>
+              &lt;rxq-affinity-list&gt; ::= NULL | &lt;non-empty-list&gt;
+            </li>
+            <li>
+              &lt;non-empty-list&gt; ::= &lt;affinity-pair&gt; |
+                                  &lt;affinity-pair&gt; , &lt;non-empty-list&gt;
+            </li>
+            <li>
+              &lt;affinity-pair&gt; ::= &lt;queue-id&gt; : &lt;core-id&gt;
+            </li>
+          </ul>
+        </p>
+      </column>
     </group>
 
     <group title="Interface Status">