diff mbox series

[ovs-dev,RFC,1/1] dpif-netdev: Add a per thread work ring

Message ID 20210414154427.56310-2-cian.ferriter@intel.com
State New
Headers show
Series Userspace deferral of work | expand

Commit Message

Ferriter, Cian April 14, 2021, 3:44 p.m. UTC
These work rings help with handling the asynchronous TX usecase. In this
usecase, netdev_send will be called, but packets won't be immediately
sent by the thread calling netdev_send, but instead handled by a
different resource. Since the TX is not instantaneous, the thread
calling netdev_send can't immediately free the packets being sent, or
report them as sent. Rather than the thread polling for completion of
the TX, it is desirable that the thread to move on and process more
packets.

The work ring serves as a FIFO queue to keep track of the asynchronous
TX calls that have been kicked off. The work ring is added/queued to when
netdev_send returns '-EINPROGRESS' indicating it kicked off an
asynchronous TX. The work ring is taken/dequeued from in 2 main cases:
1. In pmd_thread_main after processing every rxq assigned to the thread.
2. When the ring is full while trying to queue work.

Signed-off-by: Cian Ferriter <cian.ferriter@intel.com>
Co-authored-by: Harry van Haaren <harry.van.haaren@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
Co-authored-by: Sunil Pai G <sunil.pai.g@intel.com>
Signed-off-by: Sunil Pai G <sunil.pai.g@intel.com>
---
 lib/dpif-netdev-perf.c |  13 ++++-
 lib/dpif-netdev-perf.h |   7 +++
 lib/dpif-netdev.c      | 125 ++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.c      |  22 +++++---
 lib/netdev-provider.h  |  15 ++++-
 lib/netdev.c           |   3 +-
 6 files changed, 172 insertions(+), 13 deletions(-)
diff mbox series

Patch

diff --git a/lib/dpif-netdev-perf.c b/lib/dpif-netdev-perf.c
index 9560e7c3c..dfcb1d99f 100644
--- a/lib/dpif-netdev-perf.c
+++ b/lib/dpif-netdev-perf.c
@@ -274,9 +274,18 @@  pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
     if (tx_packets > 0) {
         ds_put_format(str,
             "  Tx packets:        %12"PRIu64"  (%.0f Kpps)\n"
-            "  Tx batches:        %12"PRIu64"  (%.2f pkts/batch)\n",
+            "  Tx batches:        %12"PRIu64"  (%.2f pkts/batch)\n"
+            "\n"
+            "  - Work deferred:                 %12"PRIu64"\n"
+            "  - Work deferred again:           %12"PRIu64"\n"
+            "  - Ring full when deferring work: %12"PRIu64"\n"
+            "  - Deferred work done:            %12"PRIu64"\n",
             tx_packets, (tx_packets / duration) / 1000,
-            tx_batches, 1.0 * tx_packets / tx_batches);
+            tx_batches, 1.0 * tx_packets / tx_batches,
+            stats[PMD_STAT_WORK_DEFER],
+            stats[PMD_STAT_WORK_RE_DEFER],
+            stats[PMD_STAT_WORK_R_FULL],
+            stats[PMD_STAT_WORK_DONE]);
     } else {
         ds_put_format(str, "  Tx packets:        %12d\n\n", 0);
     }
diff --git a/lib/dpif-netdev-perf.h b/lib/dpif-netdev-perf.h
index 72645b6b3..eb2f37dbd 100644
--- a/lib/dpif-netdev-perf.h
+++ b/lib/dpif-netdev-perf.h
@@ -74,6 +74,13 @@  enum pmd_stat_type {
                              * recirculation. */
     PMD_STAT_SENT_PKTS,     /* Packets that have been sent. */
     PMD_STAT_SENT_BATCHES,  /* Number of batches sent. */
+    PMD_STAT_WORK_DEFER,    /* Number of times that work was deferred. */
+    PMD_STAT_WORK_RE_DEFER, /* Number of times that already deferred work was
+                             * deferred again. */
+    PMD_STAT_WORK_R_FULL,   /* Number of times work ring was full when
+                             * deferring work. */
+    PMD_STAT_WORK_DONE,     /* Number of times that deferred work was
+                             * completed. */
     PMD_CYCLES_ITER_IDLE,   /* Cycles spent in idle iterations. */
     PMD_CYCLES_ITER_BUSY,   /* Cycles spent in busy iterations. */
     PMD_CYCLES_UPCALL,      /* Cycles spent processing upcalls. */
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 251788b04..57df07731 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -82,6 +82,9 @@ 
 #include "util.h"
 #include "uuid.h"
 
+/* TODO: Don't rely only on DPDK ring library in lib/dpif-netdev.c. */
+#include <rte_ring.h>
+
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
 /* Auto Load Balancing Defaults */
@@ -305,6 +308,20 @@  struct pmd_auto_lb {
     atomic_uint8_t rebalance_load_thresh;
 };
 
+/* Function definition for deferred work. */
+typedef int (*dp_defer_work_func)(struct netdev *netdev, int qid);
+
+/* Structure to track outstanding work for this netdev. */
+struct dp_defer_work_item {
+    dp_defer_work_func work_func;
+    void *netdev;
+    int qid;
+};
+
+struct dp_defer {
+    struct rte_ring *avail_work_ring;
+};
+
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -630,6 +647,7 @@  struct tx_port {
     long long flush_time;
     struct dp_packet_batch output_pkts;
     struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
+    dp_defer_work_func cached_work_func;
 };
 
 /* Contained by struct tx_bond 'member_buckets'. */
@@ -809,6 +827,9 @@  struct dp_netdev_pmd_thread {
 
     /* Next time when PMD should try RCU quiescing. */
     long long next_rcu_quiesce;
+
+    /* Structure to track deferred work in this thread. */
+    struct dp_defer defer;
 };
 
 /* Interface to netdev-based datapath. */
@@ -818,6 +839,66 @@  struct dpif_netdev {
     uint64_t last_port_seq;
 };
 
+static int32_t
+dp_defer_init(struct dp_netdev_pmd_thread *pmd)
+{
+#define RING_NAME_SIZE 20
+    struct dp_defer *defer = &pmd->defer;
+    char ring_name[RING_NAME_SIZE];
+
+    snprintf(ring_name, RING_NAME_SIZE, "pmd_id%u_work_ring", pmd->core_id);
+
+    /* Create SP/SC work ring. */
+    defer->avail_work_ring = rte_ring_create_elem(ring_name,
+                sizeof(struct dp_defer_work_item), /* Ring element size. */
+                128, /* Number of elements. */
+                0, /* Socket id. */
+                RING_F_SP_ENQ | RING_F_SC_DEQ); /* Flags. */
+
+    ovs_assert(defer->avail_work_ring);
+    VLOG_INFO("defer initialized, %p\n", defer->avail_work_ring);
+    return 0;
+}
+
+static inline int
+dp_defer_work(struct dp_netdev_pmd_thread *pmd,
+              struct dp_defer_work_item *work)
+{
+    int err = rte_ring_enqueue_elem(pmd->defer.avail_work_ring, work,
+                                    sizeof(struct dp_defer_work_item));
+    return err;
+}
+
+static inline int
+dp_defer_do_work(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_defer_work_item work;
+
+    int err = rte_ring_dequeue_elem(pmd->defer.avail_work_ring, &work,
+                                    sizeof(struct dp_defer_work_item));
+    if (!err) {
+        int ret = work.work_func(work.netdev, work.qid);
+
+        if (ret == -EINPROGRESS) {
+            /* Defer the work again. Don't check the return status of
+             * dp_defer_work() since it can't fail. There will always be room
+             * to defer one piece of work after the dequeue above. */
+            dp_defer_work(pmd, &work);
+            pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_RE_DEFER,
+                                    1);
+        } else {
+            pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_DONE, 1);
+        }
+    }
+    return err;
+}
+
+static inline unsigned int
+dp_defer_work_count(struct dp_netdev_pmd_thread *pmd)
+{
+    return rte_ring_count(pmd->defer.avail_work_ring);
+}
+
 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
                               struct dp_netdev_port **portp)
     OVS_REQUIRES(dp->port_mutex);
@@ -4661,6 +4742,7 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     int output_cnt;
     bool dynamic_txqs;
     struct cycle_timer timer;
+    struct netdev *netdev;
     uint64_t cycles;
     uint32_t tx_flush_interval;
 
@@ -4676,7 +4758,28 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     output_cnt = dp_packet_batch_size(&p->output_pkts);
     ovs_assert(output_cnt > 0);
 
-    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
+    netdev = p->port->netdev;
+    int ret = netdev_send(netdev, tx_qid, &p->output_pkts,
+                          dynamic_txqs);
+
+    if (ret == -EINPROGRESS) {
+        struct dp_defer_work_item work = {
+            .work_func = p->cached_work_func,
+            .netdev = netdev,
+            .qid = tx_qid,
+        };
+
+        /* Defer the work. */
+        while (dp_defer_work(pmd, &work)) {
+            /* The work ring is full, try to make room by doing work. Doing
+             * work can fail to make room if the work has to be requeued. Keep
+             * trying to do work until there is room in the ring. */
+            dp_defer_do_work(pmd);
+            pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_R_FULL, 1);
+        }
+        pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_DEFER, 1);
+    }
+
     dp_packet_batch_init(&p->output_pkts);
 
     /* Update time of the next flush. */
@@ -4686,6 +4789,9 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     ovs_assert(pmd->n_output_batches > 0);
     pmd->n_output_batches--;
 
+    /* The batch, and number of packets are updated as sent here, even though
+     * some packets might have been dropped, or are in transit asynchronously.
+     */
     pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
     pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1);
 
@@ -6068,10 +6174,19 @@  reload:
             tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
         }
 
+        /* Try to do every piece of work on the work ring once. */
+        for (i = 0; i < dp_defer_work_count(pmd); i++) {
+            dp_defer_do_work(pmd);
+        }
+
         /* Do RCU synchronization at fixed interval.  This ensures that
          * synchronization would not be delayed long even at high load of
          * packet processing. */
         if (pmd->ctx.now > pmd->next_rcu_quiesce) {
+            /* Do any work outstanding on this PMD thread. */
+            while (!dp_defer_do_work(pmd)) {
+                continue;
+            }
             if (!ovsrcu_try_quiesce()) {
                 pmd->next_rcu_quiesce =
                     pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
@@ -6080,7 +6195,10 @@  reload:
 
         if (lc++ > 1024) {
             lc = 0;
-
+            /* Do any work outstanding on this PMD thread. */
+            while (!dp_defer_do_work(pmd)) {
+                continue;
+            }
             coverage_try_clear();
             dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
             if (!ovsrcu_try_quiesce()) {
@@ -6572,6 +6690,8 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd_perf_stats_init(&pmd->perf_stats);
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
                 hash_int(core_id, 0));
+
+    dp_defer_init(pmd);
 }
 
 static void
@@ -6741,6 +6861,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     tx->qid = -1;
     tx->flush_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
+    tx->cached_work_func = port->netdev->netdev_class->process_async;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 9d8096668..1094c82ce 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -2585,7 +2585,7 @@  netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev,
     }
 }
 
-static void
+static int
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
 {
@@ -2667,6 +2667,8 @@  out:
     for (i = 0; i < n_packets_to_free; i++) {
         dp_packet_delete(pkts[i]);
     }
+
+    return 0;
 }
 
 static void
@@ -2774,7 +2776,7 @@  dpdk_copy_dp_packet_to_mbuf(struct rte_mempool *mp, struct dp_packet *pkt_orig)
 }
 
 /* Tx function. Transmit packets indefinitely */
-static void
+static int
 dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
@@ -2793,6 +2795,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     uint32_t tx_failure = 0;
     uint32_t mtu_drops = 0;
     uint32_t qos_drops = 0;
+    int ret = 0;
 
     if (dev->type != DPDK_DEV_VHOST) {
         /* Check if QoS has been configured for this netdev. */
@@ -2826,7 +2829,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
 
     if (OVS_LIKELY(txcnt)) {
         if (dev->type == DPDK_DEV_VHOST) {
-            __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
+            ret = __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
         } else {
             tx_failure += netdev_dpdk_eth_tx_burst(dev, qid,
                                                    (struct rte_mbuf **)pkts,
@@ -2843,6 +2846,8 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
         sw_stats->tx_qos_drops += qos_drops;
         rte_spinlock_unlock(&dev->stats_lock);
     }
+
+    return ret;
 }
 
 static int
@@ -2851,14 +2856,15 @@  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                        bool concurrent_txq OVS_UNUSED)
 {
 
+    int ret = 0;
     if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
-        dpdk_do_tx_copy(netdev, qid, batch);
+        ret = dpdk_do_tx_copy(netdev, qid, batch);
         dp_packet_delete_batch(batch, true);
     } else {
-        __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
-                                 dp_packet_batch_size(batch));
+        ret = __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
+                                       dp_packet_batch_size(batch));
     }
-    return 0;
+    return ret;
 }
 
 static inline void
@@ -5348,6 +5354,7 @@  static const struct netdev_class dpdk_vhost_class = {
     .construct = netdev_dpdk_vhost_construct,
     .destruct = netdev_dpdk_vhost_destruct,
     .send = netdev_dpdk_vhost_send,
+    .process_async = NULL,
     .get_carrier = netdev_dpdk_vhost_get_carrier,
     .get_stats = netdev_dpdk_vhost_get_stats,
     .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
@@ -5364,6 +5371,7 @@  static const struct netdev_class dpdk_vhost_client_class = {
     .destruct = netdev_dpdk_vhost_destruct,
     .set_config = netdev_dpdk_vhost_client_set_config,
     .send = netdev_dpdk_vhost_send,
+    .process_async = NULL,
     .get_carrier = netdev_dpdk_vhost_get_carrier,
     .get_stats = netdev_dpdk_vhost_get_stats,
     .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 73dce2fca..a7c7d2a2b 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -384,7 +384,11 @@  struct netdev_class {
      * if it would always return EOPNOTSUPP anyhow.  (This will prevent the
      * network device from being usefully used by the netdev-based "userspace
      * datapath".  It will also prevent the OVS implementation of bonding from
-     * working properly over 'netdev'.) */
+     * working properly over 'netdev'.)
+     *
+     * May return EINPROGRESS. This indicates that the netdev has more work to
+     * do, and needs to have process_async called before sending buffers is
+     * totally completed. */
     int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
                 bool concurrent_txq);
 
@@ -402,6 +406,15 @@  struct netdev_class {
      * implement packet transmission through the 'send' member function. */
     void (*send_wait)(struct netdev *netdev, int qid);
 
+    /* Performs asynchronous work required by the netdev to complete sending
+     * the buffers. The work done in the process_async function is netdev
+     * specific, but could include freeing packets or updating port stats.
+     *
+     * May return EINPROGRESS if the async call still hasn't completed,
+     * indicating process_async should be called on this function again in the
+     * future. */
+    int (*process_async)(struct netdev *netdev, int qid);
+
     /* Sets 'netdev''s Ethernet address to 'mac' */
     int (*set_etheraddr)(struct netdev *netdev, const struct eth_addr mac);
 
diff --git a/lib/netdev.c b/lib/netdev.c
index 91e91955c..7b10bec34 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -892,7 +892,8 @@  netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
     }
 
     error = netdev->netdev_class->send(netdev, qid, batch, concurrent_txq);
-    if (!error) {
+    /* For async, treat netdev_send as called when -EINPROGRESS is returned. */
+    if (!error || error == -EINPROGRESS) {
         COVERAGE_INC(netdev_sent);
     }
     return error;