diff mbox

[ovs-dev,v5,1/4] dpif-netdev: XPS (Transmit Packet Steering) implementation.

Message ID 30E3A1C5-4CCF-4B2A-B47A-FE85CC1534EA@vmware.com
State Not Applicable
Headers show

Commit Message

Daniele Di Proietto July 27, 2016, 8 p.m. UTC
I don't think dynamic_txqs should be atomic, since we change it when the pmd threads are stopped.

Also, in port_create() we should check for 'netdev_n_txq(netdev) < n_cores + 1' after we reconfigure the device.

Other than that this looks good to me, so I applied the following incremental and pushed this to master.

Thanks,

Daniele


---8<---

---8<---


On 27/07/2016 07:44, "Ilya Maximets" <i.maximets@samsung.com> wrote:

>If CPU number in pmd-cpu-mask is not divisible by the number of queues and

>in a few more complex situations there may be unfair distribution of TX

>queue-ids between PMD threads.

>

>For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask

>such distribution is possible:

><------------------------------------------------------------------------>

>pmd thread numa_id 0 core_id 13:

>        port: vhost-user1       queue-id: 1

>        port: dpdk0     queue-id: 3

>pmd thread numa_id 0 core_id 14:

>        port: vhost-user1       queue-id: 2

>pmd thread numa_id 0 core_id 16:

>        port: dpdk0     queue-id: 0

>pmd thread numa_id 0 core_id 17:

>        port: dpdk0     queue-id: 1

>pmd thread numa_id 0 core_id 12:

>        port: vhost-user1       queue-id: 0

>        port: dpdk0     queue-id: 2

>pmd thread numa_id 0 core_id 15:

>        port: vhost-user1       queue-id: 3

><------------------------------------------------------------------------>

>

>As we can see above dpdk0 port polled by threads on cores:

>	12, 13, 16 and 17.

>

>By design of dpif-netdev, there is only one TX queue-id assigned to each

>pmd thread. This queue-id's are sequential similar to core-id's. And

>thread will send packets to queue with exact this queue-id regardless

>of port.

>

>In previous example:

>

>	pmd thread on core 12 will send packets to tx queue 0

>	pmd thread on core 13 will send packets to tx queue 1

>	...

>	pmd thread on core 17 will send packets to tx queue 5

>

>So, for dpdk0 port after truncating in netdev-dpdk:

>

>	core 12 --> TX queue-id 0 % 4 == 0

>	core 13 --> TX queue-id 1 % 4 == 1

>	core 16 --> TX queue-id 4 % 4 == 0

>	core 17 --> TX queue-id 5 % 4 == 1

>

>As a result only 2 of 4 queues used.

>

>To fix this issue some kind of XPS implemented in following way:

>

>	* TX queue-ids are allocated dynamically.

>	* When PMD thread first time tries to send packets to new port

>	  it allocates less used TX queue for this port.

>	* PMD threads periodically performes revalidation of

>	  allocated TX queue-ids. If queue wasn't used in last

>	  XPS_TIMEOUT_MS milliseconds it will be freed while revalidation.

>        * XPS is not working if we have enough TX queues.

>

>Reported-by: Zhihong Wang <zhihong.wang@intel.com>

>Signed-off-by: Ilya Maximets <i.maximets@samsung.com>

>---

> lib/dpif-netdev.c     | 204 ++++++++++++++++++++++++++++++++++++++++----------

> lib/netdev-bsd.c      |   3 +-

> lib/netdev-dpdk.c     |  32 +++-----

> lib/netdev-dummy.c    |   3 +-

> lib/netdev-linux.c    |   3 +-

> lib/netdev-provider.h |  11 +--

> lib/netdev.c          |  13 ++--

> lib/netdev.h          |   2 +-

> 8 files changed, 198 insertions(+), 73 deletions(-)

>

>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c

>index f05ca4e..d1ba6f3 100644

>--- a/lib/dpif-netdev.c

>+++ b/lib/dpif-netdev.c

>@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type {

>     PMD_N_CYCLES

> };

> 

>+#define XPS_TIMEOUT_MS 500LL

>+

> /* A port in a netdev-based datapath. */

> struct dp_netdev_port {

>     odp_port_t port_no;

>@@ -256,6 +258,9 @@ struct dp_netdev_port {

>     struct netdev_saved_flags *sf;

>     unsigned n_rxq;             /* Number of elements in 'rxq' */

>     struct netdev_rxq **rxq;

>+    atomic_bool dynamic_txqs;   /* If true XPS will be used. */

>+    unsigned *txq_used;         /* Number of threads that uses each tx queue. */

>+    struct ovs_mutex txq_used_mutex;

>     char *type;                 /* Port type as requested by user. */

> };

> 

>@@ -384,8 +389,9 @@ struct rxq_poll {

> 

> /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */

> struct tx_port {

>-    odp_port_t port_no;

>-    struct netdev *netdev;

>+    struct dp_netdev_port *port;

>+    int qid;

>+    long long last_used;

>     struct hmap_node node;

> };

> 

>@@ -443,9 +449,10 @@ struct dp_netdev_pmd_thread {

>     unsigned core_id;               /* CPU core id of this pmd thread. */

>     int numa_id;                    /* numa node id of this pmd thread. */

> 

>-    /* Queue id used by this pmd thread to send packets on all netdevs.

>-     * All tx_qid's are unique and less than 'ovs_numa_get_n_cores() + 1'. */

>-    atomic_int tx_qid;

>+    /* Queue id used by this pmd thread to send packets on all netdevs if

>+     * XPS disabled for this netdev. All static_tx_qid's are unique and less

>+     * than 'ovs_numa_get_n_cores() + 1'. */

>+    atomic_int static_tx_qid;

> 

>     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */

>     /* List of rx queues to poll. */

>@@ -498,7 +505,8 @@ static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,

>                                       struct dp_packet_batch *,

>                                       bool may_steal,

>                                       const struct nlattr *actions,

>-                                      size_t actions_len);

>+                                      size_t actions_len,

>+                                      long long now);

> static void dp_netdev_input(struct dp_netdev_pmd_thread *,

>                             struct dp_packet_batch *, odp_port_t port_no);

> static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,

>@@ -541,6 +549,12 @@ static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);

> static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)

>     OVS_REQUIRES(pmd->port_mutex);

> 

>+static void

>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,

>+                               long long now, bool purge);

>+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,

>+                                      struct tx_port *tx, long long now);

>+

> static inline bool emc_entry_alive(struct emc_entry *ce);

> static void emc_clear_entry(struct emc_entry *ce);

> 

>@@ -1138,6 +1152,7 @@ port_create(const char *devname, const char *open_type, const char *type,

>     struct netdev *netdev;

>     int n_open_rxqs = 0;

>     int i, error;

>+    bool dynamic_txqs = false;

> 

>     *portp = NULL;

> 

>@@ -1171,6 +1186,9 @@ port_create(const char *devname, const char *open_type, const char *type,

>             VLOG_ERR("%s, cannot set multiq", devname);

>             goto out;

>         }

>+        if (netdev_n_txq(netdev) < n_cores + 1) {

>+            dynamic_txqs = true;

>+        }

>     }

> 

>     if (netdev_is_reconf_required(netdev)) {

>@@ -1185,7 +1203,10 @@ port_create(const char *devname, const char *open_type, const char *type,

>     port->netdev = netdev;

>     port->n_rxq = netdev_n_rxq(netdev);

>     port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);

>+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);

>     port->type = xstrdup(type);

>+    ovs_mutex_init(&port->txq_used_mutex);

>+    atomic_init(&port->dynamic_txqs, dynamic_txqs);

> 

>     for (i = 0; i < port->n_rxq; i++) {

>         error = netdev_rxq_open(netdev, &port->rxq[i], i);

>@@ -1211,7 +1232,9 @@ out_rxq_close:

>     for (i = 0; i < n_open_rxqs; i++) {

>         netdev_rxq_close(port->rxq[i]);

>     }

>+    ovs_mutex_destroy(&port->txq_used_mutex);

>     free(port->type);

>+    free(port->txq_used);

>     free(port->rxq);

>     free(port);

> 

>@@ -1351,7 +1374,8 @@ port_destroy(struct dp_netdev_port *port)

>     for (unsigned i = 0; i < port->n_rxq; i++) {

>         netdev_rxq_close(port->rxq[i]);

>     }

>-

>+    ovs_mutex_destroy(&port->txq_used_mutex);

>+    free(port->txq_used);

>     free(port->rxq);

>     free(port->type);

>     free(port);

>@@ -2476,7 +2500,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)

> 

>     packet_batch_init_packet(&pp, execute->packet);

>     dp_netdev_execute_actions(pmd, &pp, false, execute->actions,

>-                              execute->actions_len);

>+                              execute->actions_len, time_msec());

> 

>     if (pmd->core_id == NON_PMD_CORE_ID) {

>         ovs_mutex_unlock(&dp->non_pmd_mutex);

>@@ -2650,6 +2674,10 @@ port_reconfigure(struct dp_netdev_port *port)

>     }

>     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */

>     port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));

>+    /* Realloc 'used' counters for tx queues. */

>+    free(port->txq_used);

>+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);

>+

>     for (i = 0; i < netdev_n_rxq(netdev); i++) {

>         err = netdev_rxq_open(netdev, &port->rxq[i], i);

>         if (err) {

>@@ -2666,9 +2694,21 @@ reconfigure_pmd_threads(struct dp_netdev *dp)

>     OVS_REQUIRES(dp->port_mutex)

> {

>     struct dp_netdev_port *port, *next;

>+    int n_cores;

> 

>     dp_netdev_destroy_all_pmds(dp);

> 

>+    /* Reconfigures the cpu mask. */

>+    ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);

>+    free(dp->pmd_cmask);

>+    dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);

>+

>+    n_cores = ovs_numa_get_n_cores();

>+    if (n_cores == OVS_CORE_UNSPEC) {

>+        VLOG_ERR("Cannot get cpu core info");

>+        return;

>+    }

>+

>     HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {

>         int err;

> 

>@@ -2677,13 +2717,11 @@ reconfigure_pmd_threads(struct dp_netdev *dp)

>             hmap_remove(&dp->ports, &port->node);

>             seq_change(dp->port_seq);

>             port_destroy(port);

>+        } else {

>+            atomic_init(&port->dynamic_txqs,

>+                        netdev_n_txq(port->netdev) < n_cores + 1);

>         }

>     }

>-    /* Reconfigures the cpu mask. */

>-    ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);

>-    free(dp->pmd_cmask);

>-    dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);

>-

>     /* Restores the non-pmd. */

>     dp_netdev_set_nonpmd(dp);

>     /* Restores all pmd threads. */

>@@ -2727,6 +2765,7 @@ dpif_netdev_run(struct dpif *dpif)

>             }

>         }

>     }

>+    dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);

>     ovs_mutex_unlock(&dp->non_pmd_mutex);

> 

>     dp_netdev_pmd_unref(non_pmd);

>@@ -2776,6 +2815,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)

> {

>     struct tx_port *tx_port_cached;

> 

>+    /* Free all used tx queue ids. */

>+    dpif_netdev_xps_revalidate_pmd(pmd, 0, true);

>+

>     HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {

>         free(tx_port_cached);

>     }

>@@ -2795,7 +2837,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)

>     HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {

>         tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);

>         hmap_insert(&pmd->port_cache, &tx_port_cached->node,

>-                    hash_port_no(tx_port_cached->port_no));

>+                    hash_port_no(tx_port_cached->port->port_no));

>     }

> }

> 

>@@ -3011,7 +3053,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,

>     pmd->numa_id = numa_id;

>     pmd->poll_cnt = 0;

> 

>-    atomic_init(&pmd->tx_qid,

>+    atomic_init(&pmd->static_tx_qid,

>                 (core_id == NON_PMD_CORE_ID)

>                 ? ovs_numa_get_n_cores()

>                 : get_n_pmd_threads(dp));

>@@ -3107,7 +3149,7 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)

> }

> 

> /* Deletes all pmd threads on numa node 'numa_id' and

>- * fixes tx_qids of other threads to keep them sequential. */

>+ * fixes static_tx_qids of other threads to keep them sequential. */

> static void

> dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)

> {

>@@ -3125,7 +3167,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)

>          * 'dp->poll_threads' (while we're iterating it) and it

>          * might quiesce. */

>         if (pmd->numa_id == numa_id) {

>-            atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);

>+            atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]);

>             pmd_list[k] = pmd;

>             ovs_assert(k < n_pmds_on_numa);

>             k++;

>@@ -3140,12 +3182,12 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)

>     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {

>         int old_tx_qid;

> 

>-        atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);

>+        atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid);

> 

>         if (old_tx_qid >= n_pmds) {

>             int new_tx_qid = free_idx[--k];

> 

>-            atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);

>+            atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid);

>         }

>     }

> 

>@@ -3178,7 +3220,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)

>     struct tx_port *tx;

> 

>     HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {

>-        if (tx->port_no == port_no) {

>+        if (tx->port->port_no == port_no) {

>             return tx;

>         }

>     }

>@@ -3303,11 +3345,11 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,

> {

>     struct tx_port *tx = xzalloc(sizeof *tx);

> 

>-    tx->netdev = port->netdev;

>-    tx->port_no = port->port_no;

>+    tx->port = port;

>+    tx->qid = -1;

> 

>     ovs_mutex_lock(&pmd->port_mutex);

>-    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));

>+    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));

>     ovs_mutex_unlock(&pmd->port_mutex);

> }

> 

>@@ -3648,7 +3690,7 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,

>     actions = dp_netdev_flow_get_actions(flow);

> 

>     dp_netdev_execute_actions(pmd, &batch->array, true,

>-                              actions->actions, actions->size);

>+                              actions->actions, actions->size, now);

> }

> 

> static inline void

>@@ -3736,7 +3778,7 @@ static inline void

> handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,

>                      const struct netdev_flow_key *key,

>                      struct ofpbuf *actions, struct ofpbuf *put_actions,

>-                     int *lost_cnt)

>+                     int *lost_cnt, long long now)

> {

>     struct ofpbuf *add_actions;

>     struct dp_packet_batch b;

>@@ -3775,7 +3817,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,

>      * we'll send the packet up twice. */

>     packet_batch_init_packet(&b, packet);

>     dp_netdev_execute_actions(pmd, &b, true,

>-                              actions->data, actions->size);

>+                              actions->data, actions->size, now);

> 

>     add_actions = put_actions->size ? put_actions : actions;

>     if (OVS_LIKELY(error != ENOSPC)) {

>@@ -3804,7 +3846,8 @@ static inline void

> fast_path_processing(struct dp_netdev_pmd_thread *pmd,

>                      struct dp_packet_batch *packets_,

>                      struct netdev_flow_key *keys,

>-                     struct packet_batch_per_flow batches[], size_t *n_batches)

>+                     struct packet_batch_per_flow batches[], size_t *n_batches,

>+                     long long now)

> {

>     int cnt = packets_->count;

> #if !defined(__CHECKER__) && !defined(_WIN32)

>@@ -3850,8 +3893,8 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,

>             }

> 

>             miss_cnt++;

>-            handle_packet_upcall(pmd, packets[i], &keys[i], &actions, &put_actions,

>-                          &lost_cnt);

>+            handle_packet_upcall(pmd, packets[i], &keys[i], &actions,

>+                                 &put_actions, &lost_cnt, now);

>         }

> 

>         ofpbuf_uninit(&actions);

>@@ -3915,7 +3958,7 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,

>                             md_is_valid, port_no);

>     if (OVS_UNLIKELY(newcnt)) {

>         packets->count = newcnt;

>-        fast_path_processing(pmd, packets, keys, batches, &n_batches);

>+        fast_path_processing(pmd, packets, keys, batches, &n_batches, now);

>     }

> 

>     for (i = 0; i < n_batches; i++) {

>@@ -3944,6 +3987,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,

> 

> struct dp_netdev_execute_aux {

>     struct dp_netdev_pmd_thread *pmd;

>+    long long now;

> };

> 

> static void

>@@ -3964,6 +4008,79 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,

>     dp->upcall_cb = cb;

> }

> 

>+static void

>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,

>+                               long long now, bool purge)

>+{

>+    struct tx_port *tx;

>+    struct dp_netdev_port *port;

>+    long long interval;

>+    bool dynamic_txqs;

>+

>+    HMAP_FOR_EACH (tx, node, &pmd->port_cache) {

>+        atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs);

>+        if (dynamic_txqs) {

>+            continue;

>+        }

>+        interval = now - tx->last_used;

>+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {

>+            port = tx->port;

>+            ovs_mutex_lock(&port->txq_used_mutex);

>+            port->txq_used[tx->qid]--;

>+            ovs_mutex_unlock(&port->txq_used_mutex);

>+            tx->qid = -1;

>+        }

>+    }

>+}

>+

>+static int

>+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,

>+                           struct tx_port *tx, long long now)

>+{

>+    struct dp_netdev_port *port;

>+    long long interval;

>+    int i, min_cnt, min_qid;

>+

>+    if (OVS_UNLIKELY(!now)) {

>+        now = time_msec();

>+    }

>+

>+    interval = now - tx->last_used;

>+    tx->last_used = now;

>+

>+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {

>+        return tx->qid;

>+    }

>+

>+    port = tx->port;

>+

>+    ovs_mutex_lock(&port->txq_used_mutex);

>+    if (tx->qid >= 0) {

>+        port->txq_used[tx->qid]--;

>+        tx->qid = -1;

>+    }

>+

>+    min_cnt = -1;

>+    min_qid = 0;

>+    for (i = 0; i < netdev_n_txq(port->netdev); i++) {

>+        if (port->txq_used[i] < min_cnt || min_cnt == -1) {

>+            min_cnt = port->txq_used[i];

>+            min_qid = i;

>+        }

>+    }

>+

>+    port->txq_used[min_qid]++;

>+    tx->qid = min_qid;

>+

>+    ovs_mutex_unlock(&port->txq_used_mutex);

>+

>+    dpif_netdev_xps_revalidate_pmd(pmd, now, false);

>+

>+    VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",

>+             pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));

>+    return min_qid;

>+}

>+

> static struct tx_port *

> pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,

>                          odp_port_t port_no)

>@@ -3987,7 +4104,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,

>         err = -EINVAL;

>         goto error;

>     }

>-    err = netdev_push_header(tun_port->netdev, batch, data);

>+    err = netdev_push_header(tun_port->port->netdev, batch, data);

>     if (!err) {

>         return 0;

>     }

>@@ -4001,7 +4118,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,

>                             struct dp_packet *packet, bool may_steal,

>                             struct flow *flow, ovs_u128 *ufid,

>                             struct ofpbuf *actions,

>-                            const struct nlattr *userdata)

>+                            const struct nlattr *userdata, long long now)

> {

>     struct dp_packet_batch b;

>     int error;

>@@ -4014,7 +4131,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,

>     if (!error || error == ENOSPC) {

>         packet_batch_init_packet(&b, packet);

>         dp_netdev_execute_actions(pmd, &b, may_steal,

>-                                  actions->data, actions->size);

>+                                  actions->data, actions->size, now);

>     } else if (may_steal) {

>         dp_packet_delete(packet);

>     }

>@@ -4029,6 +4146,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,

>     struct dp_netdev_pmd_thread *pmd = aux->pmd;

>     struct dp_netdev *dp = pmd->dp;

>     int type = nl_attr_type(a);

>+    long long now = aux->now;

>     struct tx_port *p;

> 

>     switch ((enum ovs_action_attr)type) {

>@@ -4036,10 +4154,17 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,

>         p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));

>         if (OVS_LIKELY(p)) {

>             int tx_qid;

>+            bool dynamic_txqs;

> 

>-            atomic_read_relaxed(&pmd->tx_qid, &tx_qid);

>+            atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs);

>+            if (dynamic_txqs) {

>+                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);

>+            } else {

>+                atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);

>+            }

> 

>-            netdev_send(p->netdev, tx_qid, packets_, may_steal);

>+            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,

>+                        dynamic_txqs);

>             return;

>         }

>         break;

>@@ -4086,7 +4211,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,

> 

>                 dp_packet_batch_apply_cutlen(packets_);

> 

>-                netdev_pop_header(p->netdev, packets_);

>+                netdev_pop_header(p->port->netdev, packets_);

>                 if (!packets_->count) {

>                     return;

>                 }

>@@ -4134,7 +4259,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,

>                 flow_extract(packets[i], &flow);

>                 dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);

>                 dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,

>-                                            &ufid, &actions, userdata);

>+                                            &ufid, &actions, userdata, now);

>             }

> 

>             if (clone) {

>@@ -4200,9 +4325,10 @@ static void

> dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,

>                           struct dp_packet_batch *packets,

>                           bool may_steal,

>-                          const struct nlattr *actions, size_t actions_len)

>+                          const struct nlattr *actions, size_t actions_len,

>+                          long long now)

> {

>-    struct dp_netdev_execute_aux aux = { pmd };

>+    struct dp_netdev_execute_aux aux = { pmd, now };

> 

>     odp_execute_actions(&aux, packets, may_steal, actions,

>                         actions_len, dp_execute_cb);

>diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c

>index f963c6e..2f57c1a 100644

>--- a/lib/netdev-bsd.c

>+++ b/lib/netdev-bsd.c

>@@ -680,7 +680,8 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)

>  */

> static int

> netdev_bsd_send(struct netdev *netdev_, int qid OVS_UNUSED,

>-                struct dp_packet_batch *batch, bool may_steal)

>+                struct dp_packet_batch *batch, bool may_steal,

>+                bool concurrent_txq OVS_UNUSED)

> {

>     struct netdev_bsd *dev = netdev_bsd_cast(netdev_);

>     const char *name = netdev_get_name(netdev_);

>diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c

>index af87f18..c208f32 100644

>--- a/lib/netdev-dpdk.c

>+++ b/lib/netdev-dpdk.c

>@@ -298,7 +298,7 @@ struct dpdk_tx_queue {

>     rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue

>                                     * from concurrent access.  It is used only

>                                     * if the queue is shared among different

>-                                    * pmd threads (see 'txq_needs_locking'). */

>+                                    * pmd threads (see 'concurrent_txq'). */

>     int map;                       /* Mapping of configured vhost-user queues

>                                     * to enabled by guest. */

> };

>@@ -349,13 +349,6 @@ struct netdev_dpdk {

>     struct rte_eth_link link;

>     int link_reset_cnt;

> 

>-    /* Caller of netdev_send() might want to use more txqs than the device has.

>-     * For physical NICs, if the 'requested_n_txq' less or equal to 'up.n_txq',

>-     * 'txq_needs_locking' is false, otherwise it is true and we will take a

>-     * spinlock on transmission.  For vhost devices, 'requested_n_txq' is

>-     * always true.  */

>-    bool txq_needs_locking;

>-

>     /* virtio-net structure for vhost device */

>     OVSRCU_TYPE(struct virtio_net *) virtio_dev;

> 

>@@ -778,10 +771,8 @@ netdev_dpdk_init(struct netdev *netdev, unsigned int port_no,

>             goto unlock;

>         }

>         netdev_dpdk_alloc_txq(dev, netdev->n_txq);

>-        dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;

>     } else {

>         netdev_dpdk_alloc_txq(dev, OVS_VHOST_MAX_QUEUE_NUM);

>-        dev->txq_needs_locking = true;

>         /* Enable DPDK_DEV_VHOST device and set promiscuous mode flag. */

>         dev->flags = NETDEV_UP | NETDEV_PROMISC;

>     }

>@@ -1468,7 +1459,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)

> static int

> netdev_dpdk_vhost_send(struct netdev *netdev, int qid,

>                        struct dp_packet_batch *batch,

>-                       bool may_steal)

>+                       bool may_steal, bool concurrent_txq OVS_UNUSED)

> {

> 

>     if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {

>@@ -1484,9 +1475,10 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid,

> 

> static inline void

> netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,

>-                   struct dp_packet_batch *batch, bool may_steal)

>+                   struct dp_packet_batch *batch, bool may_steal,

>+                   bool concurrent_txq)

> {

>-    if (OVS_UNLIKELY(dev->txq_needs_locking)) {

>+    if (OVS_UNLIKELY(concurrent_txq)) {

>         qid = qid % dev->up.n_txq;

>         rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

>     }

>@@ -1551,18 +1543,19 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,

>         }

>     }

> 

>-    if (OVS_UNLIKELY(dev->txq_needs_locking)) {

>+    if (OVS_UNLIKELY(concurrent_txq)) {

>         rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);

>     }

> }

> 

> static int

> netdev_dpdk_eth_send(struct netdev *netdev, int qid,

>-                     struct dp_packet_batch *batch, bool may_steal)

>+                     struct dp_packet_batch *batch, bool may_steal,

>+                     bool concurrent_txq)

> {

>     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);

> 

>-    netdev_dpdk_send__(dev, qid, batch, may_steal);

>+    netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);

>     return 0;

> }

> 

>@@ -2533,7 +2526,8 @@ dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id)

> 

> static int

> netdev_dpdk_ring_send(struct netdev *netdev, int qid,

>-                      struct dp_packet_batch *batch, bool may_steal)

>+                      struct dp_packet_batch *batch, bool may_steal,

>+                      bool concurrent_txq)

> {

>     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);

>     unsigned i;

>@@ -2546,7 +2540,7 @@ netdev_dpdk_ring_send(struct netdev *netdev, int qid,

>         dp_packet_rss_invalidate(batch->packets[i]);

>     }

> 

>-    netdev_dpdk_send__(dev, qid, batch, may_steal);

>+    netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);

>     return 0;

> }

> 

>@@ -2823,8 +2817,6 @@ netdev_dpdk_reconfigure(struct netdev *netdev)

>     err = dpdk_eth_dev_init(dev);

>     netdev_dpdk_alloc_txq(dev, netdev->n_txq);

> 

>-    dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;

>-

> out:

> 

>     ovs_mutex_unlock(&dev->mutex);

>diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c

>index a95f7bb..813ce69 100644

>--- a/lib/netdev-dummy.c

>+++ b/lib/netdev-dummy.c

>@@ -1034,7 +1034,8 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)

> 

> static int

> netdev_dummy_send(struct netdev *netdev, int qid OVS_UNUSED,

>-                  struct dp_packet_batch *batch, bool may_steal)

>+                  struct dp_packet_batch *batch, bool may_steal,

>+                  bool concurrent_txq OVS_UNUSED)

> {

>     struct netdev_dummy *dev = netdev_dummy_cast(netdev);

>     int error = 0;

>diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c

>index c71a3df..2da8c18 100644

>--- a/lib/netdev-linux.c

>+++ b/lib/netdev-linux.c

>@@ -1161,7 +1161,8 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)

>  * expected to do additional queuing of packets. */

> static int

> netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED,

>-                  struct dp_packet_batch *batch, bool may_steal)

>+                  struct dp_packet_batch *batch, bool may_steal,

>+                  bool concurrent_txq OVS_UNUSED)

> {

>     int i;

>     int error = 0;

>diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h

>index 915a5a5..41fa9e7 100644

>--- a/lib/netdev-provider.h

>+++ b/lib/netdev-provider.h

>@@ -303,10 +303,6 @@ struct netdev_class {

>      * otherwise a positive errno value.

>      *

>      * 'n_txq' specifies the exact number of transmission queues to create.

>-     * The caller will call netdev_send() concurrently from 'n_txq' different

>-     * threads (with different qid).  The netdev provider is responsible for

>-     * making sure that these concurrent calls do not create a race condition

>-     * by using multiple hw queues or locking.

>      *

>      * The caller will call netdev_reconfigure() (if necessary) before using

>      * netdev_send() on any of the newly configured queues, giving the provider

>@@ -328,6 +324,11 @@ struct netdev_class {

>      * packets.  If 'may_steal' is true, the caller transfers ownership of all

>      * the packets to the network device, regardless of success.

>      *

>+     * If 'concurrent_txq' is true, the caller may perform concurrent calls

>+     * to netdev_send() with the same 'qid'. The netdev provider is responsible

>+     * for making sure that these concurrent calls do not create a race

>+     * condition by using locking or other synchronization if required.

>+     *

>      * The network device is expected to maintain one or more packet

>      * transmission queues, so that the caller does not ordinarily have to

>      * do additional queuing of packets.  'qid' specifies the queue to use

>@@ -341,7 +342,7 @@ struct netdev_class {

>      * datapath".  It will also prevent the OVS implementation of bonding from

>      * working properly over 'netdev'.) */

>     int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,

>-                bool may_steal);

>+                bool may_steal, bool concurrent_txq);

> 

>     /* Registers with the poll loop to wake up from the next call to

>      * poll_block() when the packet transmission queue for 'netdev' has

>diff --git a/lib/netdev.c b/lib/netdev.c

>index 31a6a46..a792eb6 100644

>--- a/lib/netdev.c

>+++ b/lib/netdev.c

>@@ -655,9 +655,6 @@ netdev_rxq_drain(struct netdev_rxq *rx)

>  * otherwise a positive errno value.

>  *

>  * 'n_txq' specifies the exact number of transmission queues to create.

>- * If this function returns successfully, the caller can make 'n_txq'

>- * concurrent calls to netdev_send() (each one with a different 'qid' in the

>- * range [0..'n_txq'-1]).

>  *

>  * The change might not effective immediately.  The caller must check if a

>  * reconfiguration is required with netdev_is_reconf_required() and eventually

>@@ -694,6 +691,11 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int n_txq)

>  * If 'may_steal' is true, the caller transfers ownership of all the packets

>  * to the network device, regardless of success.

>  *

>+ * If 'concurrent_txq' is true, the caller may perform concurrent calls

>+ * to netdev_send() with the same 'qid'. The netdev provider is responsible

>+ * for making sure that these concurrent calls do not create a race condition

>+ * by using locking or other synchronization if required.

>+ *

>  * The network device is expected to maintain one or more packet

>  * transmission queues, so that the caller does not ordinarily have to

>  * do additional queuing of packets.  'qid' specifies the queue to use

>@@ -704,14 +706,15 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int n_txq)

>  * cases this function will always return EOPNOTSUPP. */

> int

> netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,

>-            bool may_steal)

>+            bool may_steal, bool concurrent_txq)

> {

>     if (!netdev->netdev_class->send) {

>         dp_packet_delete_batch(batch, may_steal);

>         return EOPNOTSUPP;

>     }

> 

>-    int error = netdev->netdev_class->send(netdev, qid, batch, may_steal);

>+    int error = netdev->netdev_class->send(netdev, qid, batch, may_steal,

>+                                           concurrent_txq);

>     if (!error) {

>         COVERAGE_INC(netdev_sent);

>         if (!may_steal) {

>diff --git a/lib/netdev.h b/lib/netdev.h

>index 591d861..dc7ede8 100644

>--- a/lib/netdev.h

>+++ b/lib/netdev.h

>@@ -149,7 +149,7 @@ int netdev_rxq_drain(struct netdev_rxq *);

> 

> /* Packet transmission. */

> int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,

>-                bool may_steal);

>+                bool may_steal, bool concurrent_txq);

> void netdev_send_wait(struct netdev *, int qid);

> 

> /* native tunnel APIs */

>-- 

>2.7.4
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d1ba6f3..d45aba0 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -258,7 +258,7 @@  struct dp_netdev_port {
     struct netdev_saved_flags *sf;
     unsigned n_rxq;             /* Number of elements in 'rxq' */
     struct netdev_rxq **rxq;
-    atomic_bool dynamic_txqs;   /* If true XPS will be used. */
+    bool dynamic_txqs;          /* If true XPS will be used. */
     unsigned *txq_used;         /* Number of threads that uses each tx queue. */
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
@@ -1151,6 +1151,7 @@  port_create(const char *devname, const char *open_type, const char *type,
     enum netdev_flags flags;
     struct netdev *netdev;
     int n_open_rxqs = 0;
+    int n_cores = 0;
     int i, error;
     bool dynamic_txqs = false;

@@ -1171,7 +1172,7 @@  port_create(const char *devname, const char *open_type, const char *type,
     }

     if (netdev_is_pmd(netdev)) {
-        int n_cores = ovs_numa_get_n_cores();
+        n_cores = ovs_numa_get_n_cores();

         if (n_cores == OVS_CORE_UNSPEC) {
             VLOG_ERR("%s, cannot get cpu core info", devname);
@@ -1186,9 +1187,6 @@  port_create(const char *devname, const char *open_type, const char *type,
             VLOG_ERR("%s, cannot set multiq", devname);
             goto out;
         }
-        if (netdev_n_txq(netdev) < n_cores + 1) {
-            dynamic_txqs = true;
-        }
     }

     if (netdev_is_reconf_required(netdev)) {
@@ -1198,6 +1196,12 @@  port_create(const char *devname, const char *open_type, const char *type,
         }
     }

+    if (netdev_is_pmd(netdev)) {
+        if (netdev_n_txq(netdev) < n_cores + 1) {
+            dynamic_txqs = true;
+        }
+    }
+
     port = xzalloc(sizeof *port);
     port->port_no = port_no;
     port->netdev = netdev;
@@ -1206,7 +1210,7 @@  port_create(const char *devname, const char *open_type, const char *type,
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
     ovs_mutex_init(&port->txq_used_mutex);
-    atomic_init(&port->dynamic_txqs, dynamic_txqs);
+    port->dynamic_txqs = dynamic_txqs;

     for (i = 0; i < port->n_rxq; i++) {
         error = netdev_rxq_open(netdev, &port->rxq[i], i);
@@ -2718,8 +2722,7 @@  reconfigure_pmd_threads(struct dp_netdev *dp)
             seq_change(dp->port_seq);
             port_destroy(port);
         } else {
-            atomic_init(&port->dynamic_txqs,
-                        netdev_n_txq(port->netdev) < n_cores + 1);
+            port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
         }
     }
     /* Restores the non-pmd. */
@@ -4015,11 +4018,9 @@  dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
     struct tx_port *tx;
     struct dp_netdev_port *port;
     long long interval;
-    bool dynamic_txqs;

     HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
-        atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs);
-        if (dynamic_txqs) {
+        if (tx->port->dynamic_txqs) {
             continue;
         }
         interval = now - tx->last_used;
@@ -4156,7 +4157,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             int tx_qid;
             bool dynamic_txqs;

-            atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs);
+            dynamic_txqs = p->port->dynamic_txqs;
             if (dynamic_txqs) {
                 tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
             } else {