diff mbox

[ovs-dev,v9,07/15] dpif-netdev: Add pmd thread local port cache for transmission.

Message ID 1461373387-78453-8-git-send-email-diproiettod@vmware.com
State Accepted, archived
Headers show

Commit Message

Daniele Di Proietto April 23, 2016, 1:02 a.m. UTC
A future commit will stop using RCU for 'dp->ports' and use a mutex for
reading/writing them.  To avoid taking a mutex in dp_execute_cb(), which
is called in the fast path, this commit introduces a pmd thread local
cache of ports.

The downside is that every port add/remove now needs to synchronize with
every pmd thread.

Among the advantages, keeping a per thread port mapping could allow
greater control over the txq assigment.

Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>
---
 lib/dpif-netdev.c | 348 ++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 244 insertions(+), 104 deletions(-)
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index fbd23cf..a96bebf 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -184,6 +184,7 @@  static bool dpcls_lookup(const struct dpcls *cls,
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
+ *    non_pmd_mutex
  */
 struct dp_netdev {
     const struct dpif_class *const class;
@@ -379,6 +380,13 @@  struct rxq_poll {
     struct ovs_list node;
 };
 
+/* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
+struct tx_port {
+    odp_port_t port_no;
+    struct netdev *netdev;
+    struct hmap_node node;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -405,8 +413,8 @@  struct dp_netdev_pmd_thread {
 
     /* Per thread exact-match cache.  Note, the instance for cpu core
      * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
-     * need to be protected (e.g. by 'dp_netdev_mutex').  All other
-     * instances will only be accessed by its own pmd thread. */
+     * need to be protected by 'non_pmd_mutex'.  Every other instance
+     * will only be accessed by its own pmd thread. */
     struct emc_cache flow_cache;
 
     /* Classifier and Flow-Table.
@@ -435,10 +443,20 @@  struct dp_netdev_pmd_thread {
     atomic_int tx_qid;              /* Queue id used by this pmd thread to
                                      * send packets on all netdevs */
 
-    struct ovs_mutex poll_mutex;    /* Mutex for poll_list. */
+    struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
     struct ovs_list poll_list OVS_GUARDED;
-    int poll_cnt;                   /* Number of elemints in poll_list. */
+    /* Number of elements in 'poll_list' */
+    int poll_cnt;
+    /* Map of 'tx_port's used for transmission.  Written by the main thread,
+     * read by the pmd thread. */
+    struct hmap tx_ports OVS_GUARDED;
+
+    /* Map of 'tx_port' used in the fast path. This is a thread-local copy of
+     * 'tx_ports'. The instance for cpu core NON_PMD_CORE_ID can be accessed
+     * by multiple threads, and thusly need to be protected by 'non_pmd_mutex'.
+     * Every other instance will only be accessed by its own pmd thread. */
+    struct hmap port_cache;
 
     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
@@ -494,20 +512,24 @@  dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
 static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
                                              struct dp_netdev_port *port);
-static void
-dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port);
-static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                         struct dp_netdev_port *port, struct netdev_rxq *rx);
+static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
+                                       struct dp_netdev_port *port);
+static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                                         struct dp_netdev_port *port);
+static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                                     struct dp_netdev_port *port,
+                                     struct netdev_rxq *rx);
 static struct dp_netdev_pmd_thread *
 dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
+static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
+    OVS_REQUIRES(pmd->port_mutex);
 
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
@@ -690,7 +712,7 @@  pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
         ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",
                       pmd->numa_id, pmd->core_id);
 
-        ovs_mutex_lock(&pmd->poll_mutex);
+        ovs_mutex_lock(&pmd->port_mutex);
         LIST_FOR_EACH (poll, node, &pmd->poll_list) {
             const char *name = netdev_get_name(poll->port->netdev);
 
@@ -704,7 +726,7 @@  pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
             ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
             prev_name = name;
         }
-        ovs_mutex_unlock(&pmd->poll_mutex);
+        ovs_mutex_unlock(&pmd->port_mutex);
         ds_put_cstr(reply, "\n");
     }
 }
@@ -1077,6 +1099,11 @@  dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
     int old_seq;
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
+        ovs_mutex_lock(&pmd->dp->non_pmd_mutex);
+        ovs_mutex_lock(&pmd->port_mutex);
+        pmd_load_cached_ports(pmd);
+        ovs_mutex_unlock(&pmd->port_mutex);
+        ovs_mutex_unlock(&pmd->dp->non_pmd_mutex);
         return;
     }
 
@@ -1197,11 +1224,16 @@  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         return error;
     }
 
-    cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
-
     if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_add_port_to_pmds(dp, port);
+        int numa_id = netdev_get_numa_id(port->netdev);
+
+        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+        dp_netdev_set_pmds_on_numa(dp, numa_id);
     }
+
+    dp_netdev_add_port_to_pmds(dp, port);
+
+    cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
     seq_change(dp->port_seq);
 
     return 0;
@@ -1370,17 +1402,18 @@  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
 {
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
+
+    dp_netdev_del_port_from_all_pmds(dp, port);
+
     if (netdev_is_pmd(port->netdev)) {
         int numa_id = netdev_get_numa_id(port->netdev);
 
         /* PMD threads can not be on invalid numa node. */
         ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
         /* If there is no netdev on the numa node, deletes the pmd threads
-         * for that numa.  Else, deletes the queues from polling lists. */
+         * for that numa. */
         if (!has_pmd_port_for_numa(dp, numa_id)) {
             dp_netdev_del_pmds_on_numa(dp, numa_id);
-        } else {
-            dp_netdev_del_port_from_all_pmds(dp, port);
         }
     }
 
@@ -2377,7 +2410,6 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
      * the 'non_pmd_mutex'. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_lock(&dp->non_pmd_mutex);
-        ovs_mutex_lock(&dp->port_mutex);
     }
 
     pp = execute->packet;
@@ -2385,7 +2417,6 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
                               execute->actions_len);
     if (pmd->core_id == NON_PMD_CORE_ID) {
         dp_netdev_pmd_unref(pmd);
-        ovs_mutex_unlock(&dp->port_mutex);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     }
 
@@ -2649,21 +2680,53 @@  dpif_netdev_wait(struct dpif *dpif)
     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
 }
 
+static void
+pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *tx_port_cached;
+
+    HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
+        free(tx_port_cached);
+    }
+}
+
+/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
+ * 'pmd->port_cache' (thread local) */
+static void
+pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
+    OVS_REQUIRES(pmd->port_mutex)
+{
+    struct tx_port *tx_port, *tx_port_cached;
+
+    pmd_free_cached_ports(pmd);
+    hmap_shrink(&pmd->port_cache);
+
+    HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
+        tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
+        hmap_insert(&pmd->port_cache, &tx_port_cached->node,
+                    hash_port_no(tx_port_cached->port_no));
+    }
+}
+
 static int
-pmd_load_queues(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **ppoll_list)
+pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
+                          struct rxq_poll **ppoll_list)
 {
     struct rxq_poll *poll_list = *ppoll_list;
     struct rxq_poll *poll;
     int i;
 
-    ovs_mutex_lock(&pmd->poll_mutex);
+    ovs_mutex_lock(&pmd->port_mutex);
     poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
 
     i = 0;
     LIST_FOR_EACH (poll, node, &pmd->poll_list) {
         poll_list[i++] = *poll;
     }
-    ovs_mutex_unlock(&pmd->poll_mutex);
+
+    pmd_load_cached_ports(pmd);
+
+    ovs_mutex_unlock(&pmd->port_mutex);
 
     *ppoll_list = poll_list;
     return i;
@@ -2686,7 +2749,7 @@  pmd_thread_main(void *f_)
     /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
     ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
     pmd_thread_setaffinity_cpu(pmd->core_id);
-    poll_cnt = pmd_load_queues(pmd, &poll_list);
+    poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
 reload:
     emc_cache_init(&pmd->flow_cache);
 
@@ -2719,7 +2782,7 @@  reload:
         }
     }
 
-    poll_cnt = pmd_load_queues(pmd, &poll_list);
+    poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     exiting = latch_is_set(&pmd->exit_latch);
     /* Signal here to make sure the pmd finishes
      * reloading the updated configuration. */
@@ -2732,6 +2795,7 @@  reload:
     }
 
     free(poll_list);
+    pmd_free_cached_ports(pmd);
     return NULL;
 }
 
@@ -2797,9 +2861,16 @@  static void
 dp_netdev_set_nonpmd(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *non_pmd;
+    struct dp_netdev_port *port;
 
     non_pmd = xzalloc(sizeof *non_pmd);
     dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        dp_netdev_add_port_tx_to_pmd(non_pmd, port);
+    }
+
+    dp_netdev_reload_pmd__(non_pmd);
 }
 
 /* Caller must have valid pointer to 'pmd'. */
@@ -2858,10 +2929,12 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     xpthread_cond_init(&pmd->cond, NULL);
     ovs_mutex_init(&pmd->cond_mutex);
     ovs_mutex_init(&pmd->flow_mutex);
-    ovs_mutex_init(&pmd->poll_mutex);
+    ovs_mutex_init(&pmd->port_mutex);
     dpcls_init(&pmd->cls);
     cmap_init(&pmd->flow_table);
     ovs_list_init(&pmd->poll_list);
+    hmap_init(&pmd->tx_ports);
+    hmap_init(&pmd->port_cache);
     /* init the 'flow_cache' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
@@ -2876,12 +2949,14 @@  dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
 {
     dp_netdev_pmd_flow_flush(pmd);
     dpcls_destroy(&pmd->cls);
+    hmap_destroy(&pmd->port_cache);
+    hmap_destroy(&pmd->tx_ports);
     cmap_destroy(&pmd->flow_table);
     ovs_mutex_destroy(&pmd->flow_mutex);
     latch_destroy(&pmd->exit_latch);
     xpthread_cond_destroy(&pmd->cond);
     ovs_mutex_destroy(&pmd->cond_mutex);
-    ovs_mutex_destroy(&pmd->poll_mutex);
+    ovs_mutex_destroy(&pmd->port_mutex);
     free(pmd);
 }
 
@@ -2890,10 +2965,11 @@  dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
 static void
 dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
 {
-    /* Uninit the 'flow_cache' since there is
-     * no actual thread uninit it for NON_PMD_CORE_ID. */
+    /* NON_PMD_CORE_ID doesn't have a thread, so we don't have to synchronize,
+     * but extra cleanup is necessary */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         emc_cache_uninit(&pmd->flow_cache);
+        pmd_free_cached_ports(pmd);
     } else {
         latch_set(&pmd->exit_latch);
         dp_netdev_reload_pmd__(pmd);
@@ -2901,8 +2977,7 @@  dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
         xpthread_join(pmd->thread, NULL);
     }
 
-    /* Unref all ports and free poll_list. */
-    dp_netdev_pmd_clear_poll_list(pmd);
+    dp_netdev_pmd_clear_ports(pmd);
 
     /* Purges the 'pmd''s flows after stopping the thread, but before
      * destroying the flows, so that the flow stats can be collected. */
@@ -2985,30 +3060,51 @@  dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
     free(free_idx);
 }
 
-/* Deletes all rx queues from pmd->poll_list. */
+/* Deletes all rx queues from pmd->poll_list and all the ports from
+ * pmd->tx_ports. */
 static void
-dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct rxq_poll *poll;
+    struct tx_port *port;
 
-    ovs_mutex_lock(&pmd->poll_mutex);
+    ovs_mutex_lock(&pmd->port_mutex);
     LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
         free(poll);
     }
     pmd->poll_cnt = 0;
-    ovs_mutex_unlock(&pmd->poll_mutex);
+    HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
+        free(port);
+    }
+    ovs_mutex_unlock(&pmd->port_mutex);
 }
 
-/* Deletes all rx queues of 'port' from poll_list of pmd thread.  Returns true
- * if 'port' was found in 'pmd' (therefore a restart is required). */
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+    struct tx_port *tx;
+
+    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+        if (tx->port_no == port_no) {
+            return tx;
+        }
+    }
+
+    return NULL;
+}
+
+/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
+ * 'tx_ports' of 'pmd' thread.  Returns true if 'port' was found in 'pmd'
+ * (therefore a restart is required). */
 static bool
 dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
                               struct dp_netdev_pmd_thread *pmd)
 {
     struct rxq_poll *poll, *next;
+    struct tx_port *tx;
     bool found = false;
 
-    ovs_mutex_lock(&pmd->poll_mutex);
+    ovs_mutex_lock(&pmd->port_mutex);
     LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
         if (poll->port == port) {
             found = true;
@@ -3017,36 +3113,41 @@  dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
             free(poll);
         }
     }
-    ovs_mutex_unlock(&pmd->poll_mutex);
+
+    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+    if (tx) {
+        hmap_remove(&pmd->tx_ports, &tx->node);
+        free(tx);
+        found = true;
+    }
+    ovs_mutex_unlock(&pmd->port_mutex);
 
     return found;
 }
 
-/* Deletes all rx queues of 'port' from all pmd threads.  The pmd threads that
- * need to be restarted are inserted in 'to_reload'. */
+/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
+ * threads.  The pmd threads that need to be restarted are inserted in
+ * 'to_reload'. */
 static void
 dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
                                    struct dp_netdev_port *port,
                                    struct hmapx *to_reload)
 {
-    int numa_id = netdev_get_numa_id(port->netdev);
     struct dp_netdev_pmd_thread *pmd;
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->numa_id == numa_id) {
-            bool found;
+        bool found;
 
-            found = dp_netdev_del_port_from_pmd__(port, pmd);
+        found = dp_netdev_del_port_from_pmd__(port, pmd);
 
-            if (found) {
-                hmapx_add(to_reload, pmd);
-            }
-       }
+        if (found) {
+            hmapx_add(to_reload, pmd);
+        }
     }
 }
 
-/* Deletes all rx queues of 'port' from all pmd threads of dp and
- * reloads them if needed. */
+/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
+ * threads. Reloads the threads if needed. */
 static void
 dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
                                  struct dp_netdev_port *port)
@@ -3090,7 +3191,7 @@  dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
 static void
 dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
                          struct dp_netdev_port *port, struct netdev_rxq *rx)
-    OVS_REQUIRES(pmd->poll_mutex)
+    OVS_REQUIRES(pmd->port_mutex)
 {
     struct rxq_poll *poll = xmalloc(sizeof *poll);
 
@@ -3101,38 +3202,72 @@  dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
     pmd->poll_cnt++;
 }
 
-/* Distributes all rx queues of 'port' between all PMD threads in 'dp'. The
- * pmd threads that need to be restarted are inserted in 'to_reload'. */
+/* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
 static void
-dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
-                             struct hmapx *to_reload)
+dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                             struct dp_netdev_port *port)
+{
+    struct tx_port *tx = xzalloc(sizeof *tx);
+
+    tx->netdev = port->netdev;
+    tx->port_no = port->port_no;
+
+    ovs_mutex_lock(&pmd->port_mutex);
+    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
+    ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Distribute all rx queues of 'port' between PMD threads in 'dp'. The pmd
+ * threads that need to be restarted are inserted in 'to_reload'. */
+static void
+dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
+                              struct dp_netdev_port *port,
+                              struct hmapx *to_reload)
 {
     int numa_id = netdev_get_numa_id(port->netdev);
-    struct dp_netdev_pmd_thread *pmd;
     int i;
 
-    /* Cannot create pmd threads for invalid numa node. */
-    ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+    if (!netdev_is_pmd(port->netdev)) {
+        return;
+    }
 
     for (i = 0; i < port->n_rxq; i++) {
+        struct dp_netdev_pmd_thread *pmd;
+
         pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
         if (!pmd) {
-            /* There is no pmd threads on this numa node. */
-            dp_netdev_set_pmds_on_numa(dp, numa_id);
-            /* Assigning of rx queues done. */
+            VLOG_WARN("There's no pmd thread on numa node %d", numa_id);
             break;
         }
 
-        ovs_mutex_lock(&pmd->poll_mutex);
+        ovs_mutex_lock(&pmd->port_mutex);
         dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
-        ovs_mutex_unlock(&pmd->poll_mutex);
+        ovs_mutex_unlock(&pmd->port_mutex);
 
         hmapx_add(to_reload, pmd);
     }
 }
 
 /* Distributes all rx queues of 'port' between all PMD threads in 'dp' and
- * reloads them, if needed. */
+ * inserts 'port' in the PMD threads 'tx_ports'. The pmd threads that need to
+ * be restarted are inserted in 'to_reload'. */
+static void
+dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
+                             struct hmapx *to_reload)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload);
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_add_port_tx_to_pmd(pmd, port);
+        hmapx_add(to_reload, pmd);
+    }
+}
+
+/* Distributes all rx queues of 'port' between all PMD threads in 'dp', inserts
+ * 'port' in the PMD threads 'tx_ports' and reloads them, if needed. */
 static void
 dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
 {
@@ -3150,17 +3285,17 @@  dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
     hmapx_destroy(&to_reload);
 }
 
-/* Checks the numa node id of 'netdev' and starts pmd threads for
- * the numa node. */
+/* Starts pmd threads for the numa node 'numa_id', if not already started.
+ * The function takes care of filling the threads tx port cache. */
 static void
 dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 {
     int n_pmds;
 
     if (!ovs_numa_numa_id_is_valid(numa_id)) {
-        VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
-                 "invalid", numa_id);
-        return ;
+        VLOG_WARN("Cannot create pmd threads due to numa id (%d) invalid",
+                  numa_id);
+        return;
     }
 
     n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
@@ -3169,46 +3304,31 @@  dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
      * in which 'netdev' is on, do nothing.  Else, creates the
      * pmd threads for the numa node. */
     if (!n_pmds) {
-        int can_have, n_unpinned, i, index = 0;
-        struct dp_netdev_pmd_thread **pmds;
-        struct dp_netdev_port *port;
+        int can_have, n_unpinned, i;
 
         n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
         if (!n_unpinned) {
-            VLOG_ERR("Cannot create pmd threads due to out of unpinned "
-                     "cores on numa node %d", numa_id);
+            VLOG_WARN("Cannot create pmd threads due to out of unpinned "
+                      "cores on numa node %d", numa_id);
             return;
         }
 
         /* If cpu mask is specified, uses all unpinned cores, otherwise
          * tries creating NR_PMD_THREADS pmd threads. */
         can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
-        pmds = xzalloc(can_have * sizeof *pmds);
         for (i = 0; i < can_have; i++) {
             unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
-            pmds[i] = xzalloc(sizeof **pmds);
-            dp_netdev_configure_pmd(pmds[i], dp, core_id, numa_id);
-        }
+            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+            struct dp_netdev_port *port;
 
-        /* Distributes rx queues of this numa node between new pmd threads. */
-        CMAP_FOR_EACH (port, node, &dp->ports) {
-            if (netdev_is_pmd(port->netdev)
-                && netdev_get_numa_id(port->netdev) == numa_id) {
-                for (i = 0; i < port->n_rxq; i++) {
-                    /* Make thread-safety analyser happy. */
-                    ovs_mutex_lock(&pmds[index]->poll_mutex);
-                    dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]);
-                    ovs_mutex_unlock(&pmds[index]->poll_mutex);
-                    index = (index + 1) % can_have;
-                }
+            dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
+
+            CMAP_FOR_EACH (port, node, &dp->ports) {
+                dp_netdev_add_port_tx_to_pmd(pmd, port);
             }
-        }
 
-        /* Actual start of pmd threads. */
-        for (i = 0; i < can_have; i++) {
-            pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]);
+            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
         }
-        free(pmds);
         VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
     }
 }
@@ -3219,7 +3339,10 @@  dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 static void
 dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
 {
+    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
+    struct dp_netdev_pmd_thread *pmd;
     struct dp_netdev_port *port;
+    struct hmapx_node *node;
 
     CMAP_FOR_EACH (port, node, &dp->ports) {
         if (netdev_is_pmd(port->netdev)) {
@@ -3227,7 +3350,15 @@  dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
 
             dp_netdev_set_pmds_on_numa(dp, numa_id);
         }
+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload);
+    }
+
+    HMAPX_FOR_EACH (node, &to_reload) {
+        pmd = (struct dp_netdev_pmd_thread *) node->data;
+        dp_netdev_reload_pmd__(pmd);
     }
+
+    hmapx_destroy(&to_reload);
 }
 
 static char *
@@ -3719,6 +3850,13 @@  dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
     dp->upcall_cb = cb;
 }
 
+static struct tx_port *
+pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
+                         odp_port_t port_no)
+{
+    return tx_port_lookup(&pmd->port_cache, port_no);
+}
+
 static void
 dp_netdev_drop_packets(struct dp_packet **packets, int cnt, bool may_steal)
 {
@@ -3732,16 +3870,16 @@  dp_netdev_drop_packets(struct dp_packet **packets, int cnt, bool may_steal)
 }
 
 static int
-push_tnl_action(const struct dp_netdev *dp,
-                   const struct nlattr *attr,
-                   struct dp_packet **packets, int cnt)
+push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
+                const struct nlattr *attr,
+                struct dp_packet **packets, int cnt)
 {
-    struct dp_netdev_port *tun_port;
+    struct tx_port *tun_port;
     const struct ovs_action_push_tnl *data;
 
     data = nl_attr_get(attr);
 
-    tun_port = dp_netdev_lookup_port(dp, u32_to_odp(data->tnl_port));
+    tun_port = pmd_tx_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
     if (!tun_port) {
         return -EINVAL;
     }
@@ -3771,12 +3909,12 @@  dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
     struct dp_netdev_pmd_thread *pmd = aux->pmd;
     struct dp_netdev *dp = pmd->dp;
     int type = nl_attr_type(a);
-    struct dp_netdev_port *p;
+    struct tx_port *p;
     int i;
 
     switch ((enum ovs_action_attr)type) {
     case OVS_ACTION_ATTR_OUTPUT:
-        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
+        p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
         if (OVS_LIKELY(p)) {
             int tx_qid;
 
@@ -3797,7 +3935,7 @@  dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
                 packets = tnl_pkt;
             }
 
-            err = push_tnl_action(dp, a, packets, cnt);
+            err = push_tnl_action(pmd, a, packets, cnt);
             if (!err) {
                 (*depth)++;
                 dp_netdev_recirculate(pmd, packets, cnt);
@@ -3813,7 +3951,7 @@  dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
         if (*depth < MAX_RECIRC_DEPTH) {
             odp_port_t portno = u32_to_odp(nl_attr_get_u32(a));
 
-            p = dp_netdev_lookup_port(dp, portno);
+            p = pmd_tx_port_cache_lookup(pmd, portno);
             if (p) {
                 struct dp_packet *tnl_pkt[NETDEV_MAX_BURST];
                 int err;
@@ -4016,12 +4154,14 @@  dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
     /* Remove old port. */
     cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no));
+    dp_netdev_del_port_from_all_pmds(dp, old_port);
     ovsrcu_postpone(free, old_port);
 
     /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */
     new_port = xmemdup(old_port, sizeof *old_port);
     new_port->port_no = port_no;
     cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no));
+    dp_netdev_add_port_to_pmds(dp, new_port);
 
     seq_change(dp->port_seq);
     unixctl_command_reply(conn, NULL);