diff mbox

[ovs-dev,v5,1/4] dpif-netdev: XPS (Transmit Packet Steering) implementation.

Message ID 1469630684-9950-2-git-send-email-i.maximets@samsung.com
State Accepted
Delegated to: Daniele Di Proietto
Headers show

Commit Message

Ilya Maximets July 27, 2016, 2:44 p.m. UTC
If CPU number in pmd-cpu-mask is not divisible by the number of queues and
in a few more complex situations there may be unfair distribution of TX
queue-ids between PMD threads.

For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask
such distribution is possible:
<------------------------------------------------------------------------>
pmd thread numa_id 0 core_id 13:
        port: vhost-user1       queue-id: 1
        port: dpdk0     queue-id: 3
pmd thread numa_id 0 core_id 14:
        port: vhost-user1       queue-id: 2
pmd thread numa_id 0 core_id 16:
        port: dpdk0     queue-id: 0
pmd thread numa_id 0 core_id 17:
        port: dpdk0     queue-id: 1
pmd thread numa_id 0 core_id 12:
        port: vhost-user1       queue-id: 0
        port: dpdk0     queue-id: 2
pmd thread numa_id 0 core_id 15:
        port: vhost-user1       queue-id: 3
<------------------------------------------------------------------------>

As we can see above dpdk0 port polled by threads on cores:
	12, 13, 16 and 17.

By design of dpif-netdev, there is only one TX queue-id assigned to each
pmd thread. This queue-id's are sequential similar to core-id's. And
thread will send packets to queue with exact this queue-id regardless
of port.

In previous example:

	pmd thread on core 12 will send packets to tx queue 0
	pmd thread on core 13 will send packets to tx queue 1
	...
	pmd thread on core 17 will send packets to tx queue 5

So, for dpdk0 port after truncating in netdev-dpdk:

	core 12 --> TX queue-id 0 % 4 == 0
	core 13 --> TX queue-id 1 % 4 == 1
	core 16 --> TX queue-id 4 % 4 == 0
	core 17 --> TX queue-id 5 % 4 == 1

As a result only 2 of 4 queues used.

To fix this issue some kind of XPS implemented in following way:

	* TX queue-ids are allocated dynamically.
	* When PMD thread first time tries to send packets to new port
	  it allocates less used TX queue for this port.
	* PMD threads periodically performes revalidation of
	  allocated TX queue-ids. If queue wasn't used in last
	  XPS_TIMEOUT_MS milliseconds it will be freed while revalidation.
        * XPS is not working if we have enough TX queues.

Reported-by: Zhihong Wang <zhihong.wang@intel.com>
Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 lib/dpif-netdev.c     | 204 ++++++++++++++++++++++++++++++++++++++++----------
 lib/netdev-bsd.c      |   3 +-
 lib/netdev-dpdk.c     |  32 +++-----
 lib/netdev-dummy.c    |   3 +-
 lib/netdev-linux.c    |   3 +-
 lib/netdev-provider.h |  11 +--
 lib/netdev.c          |  13 ++--
 lib/netdev.h          |   2 +-
 8 files changed, 198 insertions(+), 73 deletions(-)
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index f05ca4e..d1ba6f3 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -248,6 +248,8 @@  enum pmd_cycles_counter_type {
     PMD_N_CYCLES
 };
 
+#define XPS_TIMEOUT_MS 500LL
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -256,6 +258,9 @@  struct dp_netdev_port {
     struct netdev_saved_flags *sf;
     unsigned n_rxq;             /* Number of elements in 'rxq' */
     struct netdev_rxq **rxq;
+    atomic_bool dynamic_txqs;   /* If true XPS will be used. */
+    unsigned *txq_used;         /* Number of threads that uses each tx queue. */
+    struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
 };
 
@@ -384,8 +389,9 @@  struct rxq_poll {
 
 /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
 struct tx_port {
-    odp_port_t port_no;
-    struct netdev *netdev;
+    struct dp_netdev_port *port;
+    int qid;
+    long long last_used;
     struct hmap_node node;
 };
 
@@ -443,9 +449,10 @@  struct dp_netdev_pmd_thread {
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
 
-    /* Queue id used by this pmd thread to send packets on all netdevs.
-     * All tx_qid's are unique and less than 'ovs_numa_get_n_cores() + 1'. */
-    atomic_int tx_qid;
+    /* Queue id used by this pmd thread to send packets on all netdevs if
+     * XPS disabled for this netdev. All static_tx_qid's are unique and less
+     * than 'ovs_numa_get_n_cores() + 1'. */
+    atomic_int static_tx_qid;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
@@ -498,7 +505,8 @@  static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       struct dp_packet_batch *,
                                       bool may_steal,
                                       const struct nlattr *actions,
-                                      size_t actions_len);
+                                      size_t actions_len,
+                                      long long now);
 static void dp_netdev_input(struct dp_netdev_pmd_thread *,
                             struct dp_packet_batch *, odp_port_t port_no);
 static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
@@ -541,6 +549,12 @@  static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
 static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex);
 
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+                               long long now, bool purge);
+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+                                      struct tx_port *tx, long long now);
+
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
 
@@ -1138,6 +1152,7 @@  port_create(const char *devname, const char *open_type, const char *type,
     struct netdev *netdev;
     int n_open_rxqs = 0;
     int i, error;
+    bool dynamic_txqs = false;
 
     *portp = NULL;
 
@@ -1171,6 +1186,9 @@  port_create(const char *devname, const char *open_type, const char *type,
             VLOG_ERR("%s, cannot set multiq", devname);
             goto out;
         }
+        if (netdev_n_txq(netdev) < n_cores + 1) {
+            dynamic_txqs = true;
+        }
     }
 
     if (netdev_is_reconf_required(netdev)) {
@@ -1185,7 +1203,10 @@  port_create(const char *devname, const char *open_type, const char *type,
     port->netdev = netdev;
     port->n_rxq = netdev_n_rxq(netdev);
     port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
+    ovs_mutex_init(&port->txq_used_mutex);
+    atomic_init(&port->dynamic_txqs, dynamic_txqs);
 
     for (i = 0; i < port->n_rxq; i++) {
         error = netdev_rxq_open(netdev, &port->rxq[i], i);
@@ -1211,7 +1232,9 @@  out_rxq_close:
     for (i = 0; i < n_open_rxqs; i++) {
         netdev_rxq_close(port->rxq[i]);
     }
+    ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->type);
+    free(port->txq_used);
     free(port->rxq);
     free(port);
 
@@ -1351,7 +1374,8 @@  port_destroy(struct dp_netdev_port *port)
     for (unsigned i = 0; i < port->n_rxq; i++) {
         netdev_rxq_close(port->rxq[i]);
     }
-
+    ovs_mutex_destroy(&port->txq_used_mutex);
+    free(port->txq_used);
     free(port->rxq);
     free(port->type);
     free(port);
@@ -2476,7 +2500,7 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 
     packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->actions,
-                              execute->actions_len);
+                              execute->actions_len, time_msec());
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2650,6 +2674,10 @@  port_reconfigure(struct dp_netdev_port *port)
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
     port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
+    /* Realloc 'used' counters for tx queues. */
+    free(port->txq_used);
+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
+
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
         err = netdev_rxq_open(netdev, &port->rxq[i], i);
         if (err) {
@@ -2666,9 +2694,21 @@  reconfigure_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port, *next;
+    int n_cores;
 
     dp_netdev_destroy_all_pmds(dp);
 
+    /* Reconfigures the cpu mask. */
+    ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);
+    free(dp->pmd_cmask);
+    dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);
+
+    n_cores = ovs_numa_get_n_cores();
+    if (n_cores == OVS_CORE_UNSPEC) {
+        VLOG_ERR("Cannot get cpu core info");
+        return;
+    }
+
     HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
         int err;
 
@@ -2677,13 +2717,11 @@  reconfigure_pmd_threads(struct dp_netdev *dp)
             hmap_remove(&dp->ports, &port->node);
             seq_change(dp->port_seq);
             port_destroy(port);
+        } else {
+            atomic_init(&port->dynamic_txqs,
+                        netdev_n_txq(port->netdev) < n_cores + 1);
         }
     }
-    /* Reconfigures the cpu mask. */
-    ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);
-    free(dp->pmd_cmask);
-    dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);
-
     /* Restores the non-pmd. */
     dp_netdev_set_nonpmd(dp);
     /* Restores all pmd threads. */
@@ -2727,6 +2765,7 @@  dpif_netdev_run(struct dpif *dpif)
             }
         }
     }
+    dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
     ovs_mutex_unlock(&dp->non_pmd_mutex);
 
     dp_netdev_pmd_unref(non_pmd);
@@ -2776,6 +2815,9 @@  pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct tx_port *tx_port_cached;
 
+    /* Free all used tx queue ids. */
+    dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
+
     HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
         free(tx_port_cached);
     }
@@ -2795,7 +2837,7 @@  pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
         tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
         hmap_insert(&pmd->port_cache, &tx_port_cached->node,
-                    hash_port_no(tx_port_cached->port_no));
+                    hash_port_no(tx_port_cached->port->port_no));
     }
 }
 
@@ -3011,7 +3053,7 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->poll_cnt = 0;
 
-    atomic_init(&pmd->tx_qid,
+    atomic_init(&pmd->static_tx_qid,
                 (core_id == NON_PMD_CORE_ID)
                 ? ovs_numa_get_n_cores()
                 : get_n_pmd_threads(dp));
@@ -3107,7 +3149,7 @@  dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
 }
 
 /* Deletes all pmd threads on numa node 'numa_id' and
- * fixes tx_qids of other threads to keep them sequential. */
+ * fixes static_tx_qids of other threads to keep them sequential. */
 static void
 dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 {
@@ -3125,7 +3167,7 @@  dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
          * 'dp->poll_threads' (while we're iterating it) and it
          * might quiesce. */
         if (pmd->numa_id == numa_id) {
-            atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
+            atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]);
             pmd_list[k] = pmd;
             ovs_assert(k < n_pmds_on_numa);
             k++;
@@ -3140,12 +3182,12 @@  dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         int old_tx_qid;
 
-        atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
+        atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid);
 
         if (old_tx_qid >= n_pmds) {
             int new_tx_qid = free_idx[--k];
 
-            atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
+            atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid);
         }
     }
 
@@ -3178,7 +3220,7 @@  tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
     struct tx_port *tx;
 
     HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
-        if (tx->port_no == port_no) {
+        if (tx->port->port_no == port_no) {
             return tx;
         }
     }
@@ -3303,11 +3345,11 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 {
     struct tx_port *tx = xzalloc(sizeof *tx);
 
-    tx->netdev = port->netdev;
-    tx->port_no = port->port_no;
+    tx->port = port;
+    tx->qid = -1;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
+    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     ovs_mutex_unlock(&pmd->port_mutex);
 }
 
@@ -3648,7 +3690,7 @@  packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
     actions = dp_netdev_flow_get_actions(flow);
 
     dp_netdev_execute_actions(pmd, &batch->array, true,
-                              actions->actions, actions->size);
+                              actions->actions, actions->size, now);
 }
 
 static inline void
@@ -3736,7 +3778,7 @@  static inline void
 handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
                      const struct netdev_flow_key *key,
                      struct ofpbuf *actions, struct ofpbuf *put_actions,
-                     int *lost_cnt)
+                     int *lost_cnt, long long now)
 {
     struct ofpbuf *add_actions;
     struct dp_packet_batch b;
@@ -3775,7 +3817,7 @@  handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
      * we'll send the packet up twice. */
     packet_batch_init_packet(&b, packet);
     dp_netdev_execute_actions(pmd, &b, true,
-                              actions->data, actions->size);
+                              actions->data, actions->size, now);
 
     add_actions = put_actions->size ? put_actions : actions;
     if (OVS_LIKELY(error != ENOSPC)) {
@@ -3804,7 +3846,8 @@  static inline void
 fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      struct dp_packet_batch *packets_,
                      struct netdev_flow_key *keys,
-                     struct packet_batch_per_flow batches[], size_t *n_batches)
+                     struct packet_batch_per_flow batches[], size_t *n_batches,
+                     long long now)
 {
     int cnt = packets_->count;
 #if !defined(__CHECKER__) && !defined(_WIN32)
@@ -3850,8 +3893,8 @@  fast_path_processing(struct dp_netdev_pmd_thread *pmd,
             }
 
             miss_cnt++;
-            handle_packet_upcall(pmd, packets[i], &keys[i], &actions, &put_actions,
-                          &lost_cnt);
+            handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+                                 &put_actions, &lost_cnt, now);
         }
 
         ofpbuf_uninit(&actions);
@@ -3915,7 +3958,7 @@  dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
                             md_is_valid, port_no);
     if (OVS_UNLIKELY(newcnt)) {
         packets->count = newcnt;
-        fast_path_processing(pmd, packets, keys, batches, &n_batches);
+        fast_path_processing(pmd, packets, keys, batches, &n_batches, now);
     }
 
     for (i = 0; i < n_batches; i++) {
@@ -3944,6 +3987,7 @@  dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
 
 struct dp_netdev_execute_aux {
     struct dp_netdev_pmd_thread *pmd;
+    long long now;
 };
 
 static void
@@ -3964,6 +4008,79 @@  dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
     dp->upcall_cb = cb;
 }
 
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+                               long long now, bool purge)
+{
+    struct tx_port *tx;
+    struct dp_netdev_port *port;
+    long long interval;
+    bool dynamic_txqs;
+
+    HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
+        atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs);
+        if (dynamic_txqs) {
+            continue;
+        }
+        interval = now - tx->last_used;
+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+            port = tx->port;
+            ovs_mutex_lock(&port->txq_used_mutex);
+            port->txq_used[tx->qid]--;
+            ovs_mutex_unlock(&port->txq_used_mutex);
+            tx->qid = -1;
+        }
+    }
+}
+
+static int
+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+                           struct tx_port *tx, long long now)
+{
+    struct dp_netdev_port *port;
+    long long interval;
+    int i, min_cnt, min_qid;
+
+    if (OVS_UNLIKELY(!now)) {
+        now = time_msec();
+    }
+
+    interval = now - tx->last_used;
+    tx->last_used = now;
+
+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+        return tx->qid;
+    }
+
+    port = tx->port;
+
+    ovs_mutex_lock(&port->txq_used_mutex);
+    if (tx->qid >= 0) {
+        port->txq_used[tx->qid]--;
+        tx->qid = -1;
+    }
+
+    min_cnt = -1;
+    min_qid = 0;
+    for (i = 0; i < netdev_n_txq(port->netdev); i++) {
+        if (port->txq_used[i] < min_cnt || min_cnt == -1) {
+            min_cnt = port->txq_used[i];
+            min_qid = i;
+        }
+    }
+
+    port->txq_used[min_qid]++;
+    tx->qid = min_qid;
+
+    ovs_mutex_unlock(&port->txq_used_mutex);
+
+    dpif_netdev_xps_revalidate_pmd(pmd, now, false);
+
+    VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
+             pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
+    return min_qid;
+}
+
 static struct tx_port *
 pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
                          odp_port_t port_no)
@@ -3987,7 +4104,7 @@  push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
         err = -EINVAL;
         goto error;
     }
-    err = netdev_push_header(tun_port->netdev, batch, data);
+    err = netdev_push_header(tun_port->port->netdev, batch, data);
     if (!err) {
         return 0;
     }
@@ -4001,7 +4118,7 @@  dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
                             struct dp_packet *packet, bool may_steal,
                             struct flow *flow, ovs_u128 *ufid,
                             struct ofpbuf *actions,
-                            const struct nlattr *userdata)
+                            const struct nlattr *userdata, long long now)
 {
     struct dp_packet_batch b;
     int error;
@@ -4014,7 +4131,7 @@  dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
     if (!error || error == ENOSPC) {
         packet_batch_init_packet(&b, packet);
         dp_netdev_execute_actions(pmd, &b, may_steal,
-                                  actions->data, actions->size);
+                                  actions->data, actions->size, now);
     } else if (may_steal) {
         dp_packet_delete(packet);
     }
@@ -4029,6 +4146,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     struct dp_netdev_pmd_thread *pmd = aux->pmd;
     struct dp_netdev *dp = pmd->dp;
     int type = nl_attr_type(a);
+    long long now = aux->now;
     struct tx_port *p;
 
     switch ((enum ovs_action_attr)type) {
@@ -4036,10 +4154,17 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
         if (OVS_LIKELY(p)) {
             int tx_qid;
+            bool dynamic_txqs;
 
-            atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
+            atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs);
+            if (dynamic_txqs) {
+                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
+            } else {
+                atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);
+            }
 
-            netdev_send(p->netdev, tx_qid, packets_, may_steal);
+            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
+                        dynamic_txqs);
             return;
         }
         break;
@@ -4086,7 +4211,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
                 dp_packet_batch_apply_cutlen(packets_);
 
-                netdev_pop_header(p->netdev, packets_);
+                netdev_pop_header(p->port->netdev, packets_);
                 if (!packets_->count) {
                     return;
                 }
@@ -4134,7 +4259,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 flow_extract(packets[i], &flow);
                 dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
                 dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,
-                                            &ufid, &actions, userdata);
+                                            &ufid, &actions, userdata, now);
             }
 
             if (clone) {
@@ -4200,9 +4325,10 @@  static void
 dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                           struct dp_packet_batch *packets,
                           bool may_steal,
-                          const struct nlattr *actions, size_t actions_len)
+                          const struct nlattr *actions, size_t actions_len,
+                          long long now)
 {
-    struct dp_netdev_execute_aux aux = { pmd };
+    struct dp_netdev_execute_aux aux = { pmd, now };
 
     odp_execute_actions(&aux, packets, may_steal, actions,
                         actions_len, dp_execute_cb);
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index f963c6e..2f57c1a 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -680,7 +680,8 @@  netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)
  */
 static int
 netdev_bsd_send(struct netdev *netdev_, int qid OVS_UNUSED,
-                struct dp_packet_batch *batch, bool may_steal)
+                struct dp_packet_batch *batch, bool may_steal,
+                bool concurrent_txq OVS_UNUSED)
 {
     struct netdev_bsd *dev = netdev_bsd_cast(netdev_);
     const char *name = netdev_get_name(netdev_);
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index af87f18..c208f32 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -298,7 +298,7 @@  struct dpdk_tx_queue {
     rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
                                     * from concurrent access.  It is used only
                                     * if the queue is shared among different
-                                    * pmd threads (see 'txq_needs_locking'). */
+                                    * pmd threads (see 'concurrent_txq'). */
     int map;                       /* Mapping of configured vhost-user queues
                                     * to enabled by guest. */
 };
@@ -349,13 +349,6 @@  struct netdev_dpdk {
     struct rte_eth_link link;
     int link_reset_cnt;
 
-    /* Caller of netdev_send() might want to use more txqs than the device has.
-     * For physical NICs, if the 'requested_n_txq' less or equal to 'up.n_txq',
-     * 'txq_needs_locking' is false, otherwise it is true and we will take a
-     * spinlock on transmission.  For vhost devices, 'requested_n_txq' is
-     * always true.  */
-    bool txq_needs_locking;
-
     /* virtio-net structure for vhost device */
     OVSRCU_TYPE(struct virtio_net *) virtio_dev;
 
@@ -778,10 +771,8 @@  netdev_dpdk_init(struct netdev *netdev, unsigned int port_no,
             goto unlock;
         }
         netdev_dpdk_alloc_txq(dev, netdev->n_txq);
-        dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;
     } else {
         netdev_dpdk_alloc_txq(dev, OVS_VHOST_MAX_QUEUE_NUM);
-        dev->txq_needs_locking = true;
         /* Enable DPDK_DEV_VHOST device and set promiscuous mode flag. */
         dev->flags = NETDEV_UP | NETDEV_PROMISC;
     }
@@ -1468,7 +1459,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
 static int
 netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                        struct dp_packet_batch *batch,
-                       bool may_steal)
+                       bool may_steal, bool concurrent_txq OVS_UNUSED)
 {
 
     if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
@@ -1484,9 +1475,10 @@  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
 
 static inline void
 netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
-                   struct dp_packet_batch *batch, bool may_steal)
+                   struct dp_packet_batch *batch, bool may_steal,
+                   bool concurrent_txq)
 {
-    if (OVS_UNLIKELY(dev->txq_needs_locking)) {
+    if (OVS_UNLIKELY(concurrent_txq)) {
         qid = qid % dev->up.n_txq;
         rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
     }
@@ -1551,18 +1543,19 @@  netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
         }
     }
 
-    if (OVS_UNLIKELY(dev->txq_needs_locking)) {
+    if (OVS_UNLIKELY(concurrent_txq)) {
         rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
     }
 }
 
 static int
 netdev_dpdk_eth_send(struct netdev *netdev, int qid,
-                     struct dp_packet_batch *batch, bool may_steal)
+                     struct dp_packet_batch *batch, bool may_steal,
+                     bool concurrent_txq)
 {
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
 
-    netdev_dpdk_send__(dev, qid, batch, may_steal);
+    netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);
     return 0;
 }
 
@@ -2533,7 +2526,8 @@  dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id)
 
 static int
 netdev_dpdk_ring_send(struct netdev *netdev, int qid,
-                      struct dp_packet_batch *batch, bool may_steal)
+                      struct dp_packet_batch *batch, bool may_steal,
+                      bool concurrent_txq)
 {
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
     unsigned i;
@@ -2546,7 +2540,7 @@  netdev_dpdk_ring_send(struct netdev *netdev, int qid,
         dp_packet_rss_invalidate(batch->packets[i]);
     }
 
-    netdev_dpdk_send__(dev, qid, batch, may_steal);
+    netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);
     return 0;
 }
 
@@ -2823,8 +2817,6 @@  netdev_dpdk_reconfigure(struct netdev *netdev)
     err = dpdk_eth_dev_init(dev);
     netdev_dpdk_alloc_txq(dev, netdev->n_txq);
 
-    dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;
-
 out:
 
     ovs_mutex_unlock(&dev->mutex);
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index a95f7bb..813ce69 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1034,7 +1034,8 @@  netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)
 
 static int
 netdev_dummy_send(struct netdev *netdev, int qid OVS_UNUSED,
-                  struct dp_packet_batch *batch, bool may_steal)
+                  struct dp_packet_batch *batch, bool may_steal,
+                  bool concurrent_txq OVS_UNUSED)
 {
     struct netdev_dummy *dev = netdev_dummy_cast(netdev);
     int error = 0;
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index c71a3df..2da8c18 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -1161,7 +1161,8 @@  netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
  * expected to do additional queuing of packets. */
 static int
 netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED,
-                  struct dp_packet_batch *batch, bool may_steal)
+                  struct dp_packet_batch *batch, bool may_steal,
+                  bool concurrent_txq OVS_UNUSED)
 {
     int i;
     int error = 0;
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 915a5a5..41fa9e7 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -303,10 +303,6 @@  struct netdev_class {
      * otherwise a positive errno value.
      *
      * 'n_txq' specifies the exact number of transmission queues to create.
-     * The caller will call netdev_send() concurrently from 'n_txq' different
-     * threads (with different qid).  The netdev provider is responsible for
-     * making sure that these concurrent calls do not create a race condition
-     * by using multiple hw queues or locking.
      *
      * The caller will call netdev_reconfigure() (if necessary) before using
      * netdev_send() on any of the newly configured queues, giving the provider
@@ -328,6 +324,11 @@  struct netdev_class {
      * packets.  If 'may_steal' is true, the caller transfers ownership of all
      * the packets to the network device, regardless of success.
      *
+     * If 'concurrent_txq' is true, the caller may perform concurrent calls
+     * to netdev_send() with the same 'qid'. The netdev provider is responsible
+     * for making sure that these concurrent calls do not create a race
+     * condition by using locking or other synchronization if required.
+     *
      * The network device is expected to maintain one or more packet
      * transmission queues, so that the caller does not ordinarily have to
      * do additional queuing of packets.  'qid' specifies the queue to use
@@ -341,7 +342,7 @@  struct netdev_class {
      * datapath".  It will also prevent the OVS implementation of bonding from
      * working properly over 'netdev'.) */
     int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
-                bool may_steal);
+                bool may_steal, bool concurrent_txq);
 
     /* Registers with the poll loop to wake up from the next call to
      * poll_block() when the packet transmission queue for 'netdev' has
diff --git a/lib/netdev.c b/lib/netdev.c
index 31a6a46..a792eb6 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -655,9 +655,6 @@  netdev_rxq_drain(struct netdev_rxq *rx)
  * otherwise a positive errno value.
  *
  * 'n_txq' specifies the exact number of transmission queues to create.
- * If this function returns successfully, the caller can make 'n_txq'
- * concurrent calls to netdev_send() (each one with a different 'qid' in the
- * range [0..'n_txq'-1]).
  *
  * The change might not effective immediately.  The caller must check if a
  * reconfiguration is required with netdev_is_reconf_required() and eventually
@@ -694,6 +691,11 @@  netdev_set_tx_multiq(struct netdev *netdev, unsigned int n_txq)
  * If 'may_steal' is true, the caller transfers ownership of all the packets
  * to the network device, regardless of success.
  *
+ * If 'concurrent_txq' is true, the caller may perform concurrent calls
+ * to netdev_send() with the same 'qid'. The netdev provider is responsible
+ * for making sure that these concurrent calls do not create a race condition
+ * by using locking or other synchronization if required.
+ *
  * The network device is expected to maintain one or more packet
  * transmission queues, so that the caller does not ordinarily have to
  * do additional queuing of packets.  'qid' specifies the queue to use
@@ -704,14 +706,15 @@  netdev_set_tx_multiq(struct netdev *netdev, unsigned int n_txq)
  * cases this function will always return EOPNOTSUPP. */
 int
 netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
-            bool may_steal)
+            bool may_steal, bool concurrent_txq)
 {
     if (!netdev->netdev_class->send) {
         dp_packet_delete_batch(batch, may_steal);
         return EOPNOTSUPP;
     }
 
-    int error = netdev->netdev_class->send(netdev, qid, batch, may_steal);
+    int error = netdev->netdev_class->send(netdev, qid, batch, may_steal,
+                                           concurrent_txq);
     if (!error) {
         COVERAGE_INC(netdev_sent);
         if (!may_steal) {
diff --git a/lib/netdev.h b/lib/netdev.h
index 591d861..dc7ede8 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -149,7 +149,7 @@  int netdev_rxq_drain(struct netdev_rxq *);
 
 /* Packet transmission. */
 int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
-                bool may_steal);
+                bool may_steal, bool concurrent_txq);
 void netdev_send_wait(struct netdev *, int qid);
 
 /* native tunnel APIs */