diff mbox

[ovs-dev,v3] netdev-dpdk: Implement Tx intermediate queue for dpdk ports.

Message ID 1486073677-39063-1-git-send-email-bhanuprakash.bodireddy@intel.com
State Superseded
Headers show

Commit Message

Bodireddy, Bhanuprakash Feb. 2, 2017, 10:14 p.m. UTC
After packet classification, packets are queued in to batches depending
on the matching netdev flow. Thereafter each batch is processed to
execute the related actions. This becomes particularly inefficient if
there are few packets in each batch as rte_eth_tx_burst() incurs expensive
MMIO writes.

This commit adds back intermediate queue implementation. Packets are
queued and burst when the packet count exceeds threshold. Also drain
logic is refactored to handle packets hanging in the tx queues. Testing
shows significant performance gains with this implementation.

Fixes: b59cc14e032d("netdev-dpdk: Use instant sending instead of
queueing of packets.")
Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>>
Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>
Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
---
v2->v3
  * Refactor the code
  * Use thread local copy 'send_port_cache' instead of 'tx_port' while draining
  * Invoke dp_netdev_drain_txq_port() to drain the packets from the queue as
    part of pmd reconfiguration that gets triggered due to port addition/deletion
    or change in pmd-cpu-mask.
  * Invoke netdev_txq_drain() from xps_get_tx_qid() to drain packets in old
    queue. This is possible in XPS case where the tx queue can change after
    timeout.
  * Fix another bug in netdev_dpdk_eth_tx_burst() w.r.t 'txq->count'.

Latency stats:
Collected the latency stats with PHY2PHY loopback case using 30 IXIA streams
/UDP packets/uni direction traffic. All the stats are in nanoseconds. Results 
below compare latency results between Master vs patch.

case 1: Matching IP flow rules for each IXIA stream
        Eg:  For an IXIA stream with src Ip: 2.2.2.1, dst tip: 5.5.5.1 
        ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=2.2.2.1,actions=output:2

        For an IXIA stream with src Ip: 4.4.4.1, dst tip: 15.15.15.1
        ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=4.4.4.1,actions=output:2

Packet        64             128             256             512             
Branch	Master  Patch	Master  Patch	Master	Patch   Master  Patch	
case 1  26100   222000  26190   217930  23890   199000  30370   212440 (min latency ns)
        1239100 906910  1168740 691040  575470  574240  724360  734050 (max latency ns)
        1189501	763908  913602  662941  486187  440482  470060  479376 (avg latency ns)

 
           1024              1280             1518
       Master   Patch     Master Patch      Master Patch
       28320    189610    26520  220580     23950  200480  (min latency ns)
       701040   67584670  670390 19783490   685930 747040  (max latency ns)
       444033   469297    415602 506215     429587 491593  (avg latency ns)


case 2: ovs-ofctl add-flow br0 in_port=1,action=output:2

Packet        64             128             256             512             
Branch	Master	Patch	Master	Patch	Master	Patch	Master	Patch	
case 2  18800   33970   19980   30350  22610    26800   13500   20220
        506140  596690  363010  363370  544520  541570  549120  77414700
        459509  473536  254817  256801  287872  287277  290642  301572


           1024              1280             1518
       Master   Patch     Master Patch      Master Patch
       22530    15850     21350  36020      25970  34300
       549680   131964240 543390 81549210   552060 98207410
       292436   294388    285468 305727     295133 300080


case 3 is same as case 1 with INTERIM_QUEUE_BURST_THRESHOLD=16, instead of 32.

(w) patch
case 3    64       128       256    512    1024       1280    1518 
         122700   119890   135200  117530  118900     116640  123710(min)
         972830   808960   574180  696820  36717550   720500  726790(max)
         783315   674814   463256  439412  467041     463093  471967(avg)

case 4 is same as case 2 with INTERIM_QUEUE_BURST_THRESHOLD=16, instead of 32.

(w) patch
case 4    64       128       256    512    1024       1280      1518 
         31750    26140    25250   17570   14750      28600     31460(min ns)
         722690   363200   539760  538320  301845040  12556210  132114800(max ns)
         485710   253497   285589  284095  293189     282834    285829(avg ns)

v1->v2
  * xps_get_tx_qid() is no more called twice. The last used qid is stored so
    the drain function will flush the right queue also when XPS is enabled.
  * netdev_txq_drain() is called unconditionally and not just for dpdk ports.
  * txq_drain() takes the 'tx_lock' for queue in case of dynamic tx queues.
  * Restored counting of dropped packets.
  * Changed scheduling of drain function.
  * Updated comments in netdev-provider.h
  * Fixed a comment in dp-packet.h

Details:
 * In worst case scenario with fewer packets in batch, significant
   bottleneck is observed at netdev_dpdk_eth_send() function due to 
   expensive MMIO writes.

 * Also its observed that CPI(cycles per instruction) Rate for the function
   stood between 3.15 and 4.1 which is significantly higher than acceptable
   limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend
   pipeline can retire 4 micro-operations in a cycle).

 * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall
   throughput improved significantly.

 lib/dp-packet.h       |  2 +-
 lib/dpif-netdev.c     | 66 ++++++++++++++++++++++++++++++++++++--
 lib/netdev-bsd.c      |  1 +
 lib/netdev-dpdk.c     | 87 ++++++++++++++++++++++++++++++++++++++++++++++-----
 lib/netdev-dummy.c    |  1 +
 lib/netdev-linux.c    |  1 +
 lib/netdev-provider.h |  8 +++++
 lib/netdev-vport.c    |  3 +-
 lib/netdev.c          |  9 ++++++
 lib/netdev.h          |  1 +
 10 files changed, 166 insertions(+), 13 deletions(-)

Comments

Bodireddy, Bhanuprakash April 14, 2017, 7:40 p.m. UTC | #1
The latency stats published in v3(https://mail.openvswitch.org/pipermail/ovs-dev/2017-February/328363.html)
seems to be erroneous due to the way the RFC2544 test was configured in IXIA.  Please find below the updated latency stats.
Only case 1 and Case 2 stats are published below, where the burst size is 32.

BTW,  While calculating Latency, the stats parameter was set as 'Cut Through' meaning
Latency will be calculated as first bit in, first bit out.  Also the acceptable Frame Loss % is
set to 1%. Note that the below results are aggregated results of approximately 9 iterations.

Benchmarks are done on the same commit(83ede47a48eb92053f66815e462e94a39d8a1f2c)
as v3. 

Case 1:   Matching IP flow rules for each IXIA stream
###############################################################
Packet        64                                 128                                256                          512             
Branch	Master  Patch	   Master  Patch	      Master    Patch     Master  Patch	
Min          25360    199000      30260    208890      23490    131320     19620   118700   (ns)
Max       854260    577600   868680    302440    197420    195090   160930   184740   (ns)
Avg         384182   261213    412612    262091   190386    166025   133661   154787   (ns)

                        1024                      1280                          1518
           Master   Patch      Master Patch        Master Patch
Min   20290      180650      30370  157260      19680   147550   (ns)
Max  304290     239750   178570  216650    199140   209050  (ns)
Avg    260350     209316   149328  185930    170091  177033   (ns)


case 2: ovs-ofctl add-flow br0 in_port=1,action=output:2
###############################################################
Packet                 64                          128                         256                                  512
Branch	Master	  Patch	    Master      Patch       Master	Patch	Master	Patch	
Min           27870      30680      13080        29160         12000     18970         14520  14610  (ns)
Max        323790    205930   282360       289470       39170      51610         48340  80670  (ns)
Avg          162219    163582     40685        41677         21582     41546         35017  66192  (ns)

                             1024                         1280                        1518
                   Master   Patch     Master Patch      Master Patch
Min          10820      29670       11270   24740      11510       24780  (ns)
Max         29480      70300        29900   39010      32460      40010  (ns)
Avg           18926      54582       19239   30636      19087      16722  (ns)

Regards,
Bhanuprakash. 

>-----Original Message-----
>From: Bodireddy, Bhanuprakash
>Sent: Thursday, February 2, 2017 10:15 PM
>To: dev@openvswitch.org
>Cc: i.maximets@samsung.com; ktraynor@redhat.com; diproiettod@ovn.org;
>Bodireddy, Bhanuprakash <bhanuprakash.bodireddy@intel.com>; Fischetti,
>Antonio <antonio.fischetti@intel.com>; Markus Magnusson
><markus.magnusson@ericsson.com>
>Subject: [PATCH v3] netdev-dpdk: Implement Tx intermediate queue for
>dpdk ports.
>
>After packet classification, packets are queued in to batches depending on the
>matching netdev flow. Thereafter each batch is processed to execute the
>related actions. This becomes particularly inefficient if there are few packets
>in each batch as rte_eth_tx_burst() incurs expensive MMIO writes.
>
>This commit adds back intermediate queue implementation. Packets are
>queued and burst when the packet count exceeds threshold. Also drain logic
>is refactored to handle packets hanging in the tx queues. Testing shows
>significant performance gains with this implementation.
>
>Fixes: b59cc14e032d("netdev-dpdk: Use instant sending instead of queueing
>of packets.")
>Signed-off-by: Bhanuprakash Bodireddy
><bhanuprakash.bodireddy@intel.com>>
>Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
>Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
>Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>
>Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
>---
>v2->v3
>  * Refactor the code
>  * Use thread local copy 'send_port_cache' instead of 'tx_port' while draining
>  * Invoke dp_netdev_drain_txq_port() to drain the packets from the queue
>as
>    part of pmd reconfiguration that gets triggered due to port
>addition/deletion
>    or change in pmd-cpu-mask.
>  * Invoke netdev_txq_drain() from xps_get_tx_qid() to drain packets in old
>    queue. This is possible in XPS case where the tx queue can change after
>    timeout.
>  * Fix another bug in netdev_dpdk_eth_tx_burst() w.r.t 'txq->count'.
>
>Latency stats:
>Collected the latency stats with PHY2PHY loopback case using 30 IXIA streams
>/UDP packets/uni direction traffic. All the stats are in nanoseconds. Results
>below compare latency results between Master vs patch.
>
>case 1: Matching IP flow rules for each IXIA stream
>        Eg:  For an IXIA stream with src Ip: 2.2.2.1, dst tip: 5.5.5.1
>        ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=2.2.2.1,actions=output:2
>
>        For an IXIA stream with src Ip: 4.4.4.1, dst tip: 15.15.15.1
>        ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=4.4.4.1,actions=output:2
>
>Packet        64             128             256             512
>Branch	Master  Patch	Master  Patch	Master	Patch   Master  Patch
>case 1  26100   222000  26190   217930  23890   199000  30370   212440 (min
>latency ns)
>        1239100 906910  1168740 691040  575470  574240  724360  734050 (max
>latency ns)
>        1189501	763908  913602  662941  486187  440482  470060  479376 (avg
>latency ns)
>
>
>           1024              1280             1518
>       Master   Patch     Master Patch      Master Patch
>       28320    189610    26520  220580     23950  200480  (min latency ns)
>       701040   67584670  670390 19783490   685930 747040  (max latency ns)
>       444033   469297    415602 506215     429587 491593  (avg latency ns)
>
>
>case 2: ovs-ofctl add-flow br0 in_port=1,action=output:2
>
>Packet        64             128             256             512
>Branch	Master	Patch	Master	Patch	Master	Patch	Master	Patch
>case 2  18800   33970   19980   30350  22610    26800   13500   20220
>        506140  596690  363010  363370  544520  541570  549120  77414700
>        459509  473536  254817  256801  287872  287277  290642  301572
>
>
>           1024              1280             1518
>       Master   Patch     Master Patch      Master Patch
>       22530    15850     21350  36020      25970  34300
>       549680   131964240 543390 81549210   552060 98207410
>       292436   294388    285468 305727     295133 300080
>
>
>case 3 is same as case 1 with INTERIM_QUEUE_BURST_THRESHOLD=16,
>instead of 32.
>
>(w) patch
>case 3    64       128       256    512    1024       1280    1518
>         122700   119890   135200  117530  118900     116640  123710(min)
>         972830   808960   574180  696820  36717550   720500  726790(max)
>         783315   674814   463256  439412  467041     463093  471967(avg)
>
>case 4 is same as case 2 with INTERIM_QUEUE_BURST_THRESHOLD=16,
>instead of 32.
>
>(w) patch
>case 4    64       128       256    512    1024       1280      1518
>         31750    26140    25250   17570   14750      28600     31460(min ns)
>         722690   363200   539760  538320  301845040  12556210  132114800(max ns)
>         485710   253497   285589  284095  293189     282834    285829(avg ns)
>
>v1->v2
>  * xps_get_tx_qid() is no more called twice. The last used qid is stored so
>    the drain function will flush the right queue also when XPS is enabled.
>  * netdev_txq_drain() is called unconditionally and not just for dpdk ports.
>  * txq_drain() takes the 'tx_lock' for queue in case of dynamic tx queues.
>  * Restored counting of dropped packets.
>  * Changed scheduling of drain function.
>  * Updated comments in netdev-provider.h
>  * Fixed a comment in dp-packet.h
>
>Details:
> * In worst case scenario with fewer packets in batch, significant
>   bottleneck is observed at netdev_dpdk_eth_send() function due to
>   expensive MMIO writes.
>
> * Also its observed that CPI(cycles per instruction) Rate for the function
>   stood between 3.15 and 4.1 which is significantly higher than acceptable
>   limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend
>   pipeline can retire 4 micro-operations in a cycle).
>
> * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall
>   throughput improved significantly.
>
> lib/dp-packet.h       |  2 +-
> lib/dpif-netdev.c     | 66 ++++++++++++++++++++++++++++++++++++--
> lib/netdev-bsd.c      |  1 +
> lib/netdev-dpdk.c     | 87
>++++++++++++++++++++++++++++++++++++++++++++++-----
> lib/netdev-dummy.c    |  1 +
> lib/netdev-linux.c    |  1 +
> lib/netdev-provider.h |  8 +++++
> lib/netdev-vport.c    |  3 +-
> lib/netdev.c          |  9 ++++++
> lib/netdev.h          |  1 +
> 10 files changed, 166 insertions(+), 13 deletions(-)
>
>diff --git a/lib/dp-packet.h b/lib/dp-packet.h index 17b7026..9e3912a 100644
>--- a/lib/dp-packet.h
>+++ b/lib/dp-packet.h
>@@ -39,7 +39,7 @@ enum OVS_PACKED_ENUM dp_packet_source {
>     DPBUF_STACK,               /* Un-movable stack space or static buffer. */
>     DPBUF_STUB,                /* Starts on stack, may expand into heap. */
>     DPBUF_DPDK,                /* buffer data is from DPDK allocated memory.
>-                                * ref to build_dp_packet() in netdev-dpdk. */
>+                                * Ref dp_packet_init_dpdk() in
>+ dp-packet.c */
> };
>
> #define DP_PACKET_CONTEXT_SIZE 64
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 719a518..b0d47fa
>100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -289,6 +289,8 @@ struct dp_netdev_rxq {
>     struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this
>queue. */  };
>
>+#define LAST_USED_QID_NONE -1
>+
> /* A port in a netdev-based datapath. */  struct dp_netdev_port {
>     odp_port_t port_no;
>@@ -437,8 +439,14 @@ struct rxq_poll {
> struct tx_port {
>     struct dp_netdev_port *port;
>     int qid;
>-    long long last_used;
>+    long long last_used;        /* In case XPS is enabled, it contains the
>+				 * timestamp of the last time the port was
>+				 * used by the thread to send data.  After
>+				 * XPS_TIMEOUT_MS elapses the qid will be
>+				 * marked as -1. */
>     struct hmap_node node;
>+    int last_used_qid;          /* Last queue id where packets could be
>+                                   enqueued. */
> };
>
> /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>@@ -2900,6 +2908,25 @@ cycles_count_end(struct dp_netdev_pmd_thread
>*pmd,  }
>
> static void
>+dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd) {
>+    struct tx_port *cached_tx_port;
>+    int tx_qid;
>+
>+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
>+        tx_qid = cached_tx_port->last_used_qid;
>+
>+        if (tx_qid != LAST_USED_QID_NONE) {
>+            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
>+                         cached_tx_port->port->dynamic_txqs);
>+
>+            /* Queue drained and mark it empty. */
>+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
>+        }
>+    }
>+}
>+
>+static void
> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                            struct netdev_rxq *rx,
>                            odp_port_t port_no) @@ -3514,15 +3541,18 @@
>pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
>     return i;
> }
>
>+enum { DRAIN_TSC = 20000ULL };
>+
> static void *
> pmd_thread_main(void *f_)
> {
>     struct dp_netdev_pmd_thread *pmd = f_;
>-    unsigned int lc = 0;
>+    unsigned int lc = 0, lc_drain = 0;
>     struct polled_queue *poll_list;
>     bool exiting;
>     int poll_cnt;
>     int i;
>+    uint64_t prev = 0, now = 0;
>
>     poll_list = NULL;
>
>@@ -3555,6 +3585,17 @@ reload:
>                                        poll_list[i].port_no);
>         }
>
>+#define MAX_LOOP_TO_DRAIN 128
>+        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
>+            lc_drain = 0;
>+            prev = now;
>+            now = pmd->last_cycles;
>+
>+            if ((now - prev) > DRAIN_TSC) {
>+                dp_netdev_drain_txq_ports(pmd);
>+            }
>+        }
>+
>         if (lc++ > 1024) {
>             bool reload;
>
>@@ -3573,6 +3614,9 @@ reload:
>         }
>     }
>
>+    /* Drain the queues as part of reconfiguration */
>+    dp_netdev_drain_txq_ports(pmd);
>+
>     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
>     exiting = latch_is_set(&pmd->exit_latch);
>     /* Signal here to make sure the pmd finishes @@ -3890,6 +3934,7 @@
>dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>
>     tx->port = port;
>     tx->qid = -1;
>+    tx->last_used_qid = LAST_USED_QID_NONE;
>
>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no));
>     pmd->need_reload = true;
>@@ -4454,6 +4499,14 @@ dpif_netdev_xps_get_tx_qid(const struct
>dp_netdev_pmd_thread *pmd,
>
>     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>
>+    /* The tx queue can change in XPS case, make sure packets in previous
>+     * queue is drained properly. */
>+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
>+               tx->qid != tx->last_used_qid) {
>+        netdev_txq_drain(port->netdev, tx->last_used_qid, port-
>>dynamic_txqs);
>+        tx->last_used_qid = LAST_USED_QID_NONE;
>+    }
>+
>     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>     return min_qid;
>@@ -4548,6 +4601,13 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>                 tx_qid = pmd->static_tx_qid;
>             }
>
>+            /* In case these packets gets buffered into an intermediate
>+             * queue and XPS is enabled the drain function could find a
>+             * different Tx qid assigned to its thread.  We keep track
>+             * of the qid we're now using, that will trigger the drain
>+             * function and will select the right queue to flush. */
>+            p->last_used_qid = tx_qid;
>+
>             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>                         dynamic_txqs);
>             return;
>@@ -4960,7 +5020,7 @@ dpif_dummy_register(enum dummy_level level)
>                              "dp port new-number",
>                              3, 3, dpif_dummy_change_port_number, NULL);  }
>-

>+
> /* Datapath Classifier. */
>
> /* A set of rules that all have the same fields wildcarded. */ diff --git
>a/lib/netdev-bsd.c b/lib/netdev-bsd.c index 94c515d..00d5263 100644
>--- a/lib/netdev-bsd.c
>+++ b/lib/netdev-bsd.c
>@@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_,
>enum netdev_flags off,
>     netdev_bsd_rxq_recv,                             \
>     netdev_bsd_rxq_wait,                             \
>     netdev_bsd_rxq_drain,                            \
>+    NULL,                                            \
> }
>
> const struct netdev_class netdev_bsd_class = diff --git a/lib/netdev-dpdk.c
>b/lib/netdev-dpdk.c index 94568a1..3def755 100644
>--- a/lib/netdev-dpdk.c
>+++ b/lib/netdev-dpdk.c
>@@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = {
>
> enum { DPDK_RING_SIZE = 256 };
> BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
>-enum { DRAIN_TSC = 200000ULL };
>
> enum dpdk_dev_type {
>     DPDK_DEV_ETH = 0,
>@@ -286,15 +285,26 @@ struct dpdk_mp {
>     struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);  };
>
>+/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before
>tranmitting.
>+ * Defaults to 'NETDEV_MAX_BURST'(32) now.
>+ */
>+#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
>+
> /* There should be one 'struct dpdk_tx_queue' created for
>  * each cpu core. */
> struct dpdk_tx_queue {
>+    int count;                     /* Number of buffered packets waiting to
>+                                      be sent. */
>     rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
>                                     * from concurrent access.  It is used only
>                                     * if the queue is shared among different
>                                     * pmd threads (see 'concurrent_txq'). */
>     int map;                       /* Mapping of configured vhost-user queues
>                                     * to enabled by guest. */
>+    struct rte_mbuf *burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>+                                   /* Intermediate queues where packets can
>+                                    * be buffered to amortize the cost of MMIO
>+                                    * writes. */
> };
>
> /* dpdk has no way to remove dpdk ring ethernet devices @@ -1381,6
>+1391,7 @@ static inline int  netdev_dpdk_eth_tx_burst(struct netdev_dpdk
>*dev, int qid,
>                          struct rte_mbuf **pkts, int cnt)  {
>+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>     uint32_t nb_tx = 0;
>
>     while (nb_tx != cnt) {
>@@ -1404,6 +1415,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk
>*dev, int qid,
>         }
>     }
>
>+    txq->count = 0;
>     return cnt - nb_tx;
> }
>
>@@ -1788,12 +1800,42 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid,
>struct dp_packet_batch *batch)
>     }
> }
>
>+/* Enqueue packets in an intermediate queue and call the burst
>+ * function when the queue is full.  This way we can amortize the
>+ * cost of MMIO writes. */
>+static inline int
>+netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
>+                            struct rte_mbuf **pkts, int cnt) {
>+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>+
>+    int i = 0;
>+    int dropped = 0;
>+
>+    while (i < cnt) {
>+        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->count;
>+        int tocopy = MIN(freeslots, cnt-i);
>+
>+        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
>+               tocopy * sizeof (struct rte_mbuf *));
>+
>+        txq->count += tocopy;
>+        i += tocopy;
>+
>+        /* Queue full, burst the packets */
>+        if (txq->count >= INTERIM_QUEUE_BURST_THRESHOLD) {
>+           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
>+                   txq->count);
>+        }
>+    }
>+    return dropped;
>+}
>+
> static int
> netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                        struct dp_packet_batch *batch,
>                        bool may_steal, bool concurrent_txq OVS_UNUSED)  {
>-
>     if (OVS_UNLIKELY(!may_steal || batch->packets[0]->source !=
>DPBUF_DPDK)) {
>         dpdk_do_tx_copy(netdev, qid, batch);
>         dp_packet_delete_batch(batch, may_steal); @@ -1836,7 +1878,7 @@
>netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>         cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
>         dropped = batch->count - cnt;
>
>-        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
>+        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
>
>         if (OVS_UNLIKELY(dropped)) {
>             rte_spinlock_lock(&dev->stats_lock);
>@@ -1850,6 +1892,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev,
>int qid,
>     }
> }
>
>+/* Drain tx queues, this is called periodically to empty the
>+ * intermediate queue in case of few packets (<
>+INTERIM_QUEUE_BURST_THRESHOLD)
>+ * are buffered into the queue. */
>+static int
>+netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool
>+concurrent_txq) {
>+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>+
>+    if (OVS_LIKELY(txq->count)) {
>+        if (OVS_UNLIKELY(concurrent_txq)) {
>+            qid = qid % dev->up.n_txq;
>+            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>+        }
>+
>+        netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
>+ txq->count);
>+
>+        if (OVS_UNLIKELY(concurrent_txq)) {
>+            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>+        }
>+    }
>+    return 0;
>+}
>+
> static int
> netdev_dpdk_eth_send(struct netdev *netdev, int qid,
>                      struct dp_packet_batch *batch, bool may_steal, @@ -3243,7
>+3309,7 @@ unlock:
>                           SET_CONFIG, SET_TX_MULTIQ, SEND,    \
>                           GET_CARRIER, GET_STATS,             \
>                           GET_FEATURES, GET_STATUS,           \
>-                          RECONFIGURE, RXQ_RECV)              \
>+                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
> {                                                             \
>     NAME,                                                     \
>     true,                       /* is_pmd */                  \
>@@ -3310,6 +3376,7 @@ unlock:
>     RXQ_RECV,                                                 \
>     NULL,                       /* rx_wait */                 \
>     NULL,                       /* rxq_drain */               \
>+    TXQ_DRAIN,      /* txq_drain */                           \
> }
>
> static const struct netdev_class dpdk_class = @@ -3326,7 +3393,8 @@ static
>const struct netdev_class dpdk_class =
>         netdev_dpdk_get_features,
>         netdev_dpdk_get_status,
>         netdev_dpdk_reconfigure,
>-        netdev_dpdk_rxq_recv);
>+        netdev_dpdk_rxq_recv,
>+        netdev_dpdk_txq_drain);
>
> static const struct netdev_class dpdk_ring_class =
>     NETDEV_DPDK_CLASS(
>@@ -3342,7 +3410,8 @@ static const struct netdev_class dpdk_ring_class =
>         netdev_dpdk_get_features,
>         netdev_dpdk_get_status,
>         netdev_dpdk_reconfigure,
>-        netdev_dpdk_rxq_recv);
>+        netdev_dpdk_rxq_recv,
>+        NULL);
>
> static const struct netdev_class dpdk_vhost_class =
>     NETDEV_DPDK_CLASS(
>@@ -3358,7 +3427,8 @@ static const struct netdev_class dpdk_vhost_class =
>         NULL,
>         NULL,
>         netdev_dpdk_vhost_reconfigure,
>-        netdev_dpdk_vhost_rxq_recv);
>+        netdev_dpdk_vhost_rxq_recv,
>+        NULL);
> static const struct netdev_class dpdk_vhost_client_class =
>     NETDEV_DPDK_CLASS(
>         "dpdkvhostuserclient",
>@@ -3373,7 +3443,8 @@ static const struct netdev_class
>dpdk_vhost_client_class =
>         NULL,
>         NULL,
>         netdev_dpdk_vhost_client_reconfigure,
>-        netdev_dpdk_vhost_rxq_recv);
>+        netdev_dpdk_vhost_rxq_recv,
>+        NULL);
>
> void
> netdev_dpdk_register(void)
>diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index
>0657434..4ef659e 100644
>--- a/lib/netdev-dummy.c
>+++ b/lib/netdev-dummy.c
>@@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev
>*netdev_,
>     netdev_dummy_rxq_recv,                                      \
>     netdev_dummy_rxq_wait,                                      \
>     netdev_dummy_rxq_drain,                                     \
>+    NULL,                                                       \
> }
>
> static const struct netdev_class dummy_class = diff --git a/lib/netdev-linux.c
>b/lib/netdev-linux.c index 9ff1333..79478ee 100644
>--- a/lib/netdev-linux.c
>+++ b/lib/netdev-linux.c
>@@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev
>*netdev_, enum netdev_flags off,
>     netdev_linux_rxq_recv,                                      \
>     netdev_linux_rxq_wait,                                      \
>     netdev_linux_rxq_drain,                                     \
>+    NULL,                                                       \
> }
>
> const struct netdev_class netdev_linux_class = diff --git a/lib/netdev-
>provider.h b/lib/netdev-provider.h index 8346fc4..97e72c6 100644
>--- a/lib/netdev-provider.h
>+++ b/lib/netdev-provider.h
>@@ -335,6 +335,11 @@ struct netdev_class {
>      * If the function returns a non-zero value, some of the packets might have
>      * been sent anyway.
>      *
>+     * Some netdev provider - like in case of 'dpdk' - may buffer the batch
>+     * of packets into an intermediate queue.  Buffered packets will be sent
>+     * out when their number will exceed a threshold or by the periodic call
>+     * to the drain function.
>+     *
>      * If 'may_steal' is false, the caller retains ownership of all the
>      * packets.  If 'may_steal' is true, the caller transfers ownership of all
>      * the packets to the network device, regardless of success.
>@@ -769,6 +774,9 @@ struct netdev_class {
>
>     /* Discards all packets waiting to be received from 'rx'. */
>     int (*rxq_drain)(struct netdev_rxq *rx);
>+
>+    /* Drain all packets waiting to be sent on queue 'qid'. */
>+    int (*txq_drain)(struct netdev *netdev, int qid, bool
>+ concurrent_txq);
> };
>
> int netdev_register_provider(const struct netdev_class *); diff --git
>a/lib/netdev-vport.c b/lib/netdev-vport.c index 2d0aa43..64cf617 100644
>--- a/lib/netdev-vport.c
>+++ b/lib/netdev-vport.c
>@@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct
>netdev_stats *stats)
>     NULL,                   /* rx_dealloc */                \
>     NULL,                   /* rx_recv */                   \
>     NULL,                   /* rx_wait */                   \
>-    NULL,                   /* rx_drain */
>+    NULL,                   /* rx_drain */                  \
>+    NULL,                   /* tx_drain */
>
>
> #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER,
>POP_HEADER)   \
>diff --git a/lib/netdev.c b/lib/netdev.c index 1e6bb2b..5e0c53f 100644
>--- a/lib/netdev.c
>+++ b/lib/netdev.c
>@@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
>             : 0);
> }
>
>+/* Flush packets on the queue 'qid'. */ int netdev_txq_drain(struct
>+netdev *netdev, int qid, bool netdev_txq_drain) {
>+    return (netdev->netdev_class->txq_drain
>+            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
>+            : EOPNOTSUPP);
>+}
>+
> /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
>  * otherwise a positive errno value.
>  *
>diff --git a/lib/netdev.h b/lib/netdev.h index d6c07c1..7ddd790 100644
>--- a/lib/netdev.h
>+++ b/lib/netdev.h
>@@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);  int
>netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
>                 bool may_steal, bool concurrent_txq);  void netdev_send_wait(struct
>netdev *, int qid);
>+int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
>
> /* native tunnel APIs */
> /* Structure to pass parameters required to build a tunnel header. */
>--
>2.4.11
Eelco Chaudron April 18, 2017, 12:30 p.m. UTC | #2
Hi Bhanuprakash,

I was doing some Physical to Virtual tests, and whenever the number of flows
reaches the rx batch size performance dropped a lot. I created an 
experimental
patch where I added an intermediate queue and flush it at the end of the
rx batch.

When I found your patch I decided to give it try to see how it behaves.
I also modified you patch in such a way that it will flush the queue
after every call to dp_netdev_process_rxq_port().

Here are some pkt forwarding stats for the Physical to Physical scenario,
for two 82599ES 10G port with 64 byte packets being send at wire speed:

Number      plain                patch +
of flows  git clone    patch      flush
========  =========  =========  =========
   10       10727283   13527752   13393844
   32        7042253   11285572     11228799
   50        7515491    9642650      9607791
  100        5838699    9461239      9430730
  500        5285066    7859123      7845807
1000        5226477    7146404      7135601


I do not have an IXIA to do the latency tests you performed, however I
do have a XENA tester which has a basic latency measurement feature.
I used the following script to get the latency numbers:

https://github.com/chaudron/XenaPythonLib/blob/latency/examples/latency.py


As you can see in the numbers below, the default queue introduces quite
some latency, however doing the flush every rx batch brings the latency
down to almost the original values. The results mimics your test case 2,
sending 10G traffic @ wire speed:

   ===== GIT CLONE
   Pkt size  min(ns)  avg(ns)  max(ns)
    512      4,631      5,022    309,914
   1024      5,545      5,749    104,294
   1280      5,978      6,159     45,306
   1518      6,419      6,774    946,850

   ===== PATCH
   Pkt size  min(ns)  avg(ns)  max(ns)
    512      4,928    492,228  1,995,026
   1024      5,761    499,206  2,006,628
   1280      6,186    497,975  1,986,175
   1518      6,579    494,434  2,005,947

   ===== PATCH + FLUSH
   Pkt size  min(ns)  avg(ns)  max(ns)
    512      4,711      5,064    182,477
   1024      5,601      5,888    701,654
   1280      6,018      6,491    533,037
   1518      6,467      6,734    312,471

Maybe it will be good to re-run your latency tests with the flush for 
every rx
batch. This might get ride of your huge latency while still increasing the
performance in the case the rx batch shares the same egress port.

The overall patchset looks fine to me, see some comments inline.

Cheers,

Eelco


On 02/02/17 23:14, Bhanuprakash Bodireddy wrote:
> After packet classification, packets are queued in to batches depending
> on the matching netdev flow. Thereafter each batch is processed to
> execute the related actions. This becomes particularly inefficient if
> there are few packets in each batch as rte_eth_tx_burst() incurs expensive
> MMIO writes.
>
> This commit adds back intermediate queue implementation. Packets are
> queued and burst when the packet count exceeds threshold. Also drain
> logic is refactored to handle packets hanging in the tx queues. Testing
> shows significant performance gains with this implementation.
>
> Fixes: b59cc14e032d("netdev-dpdk: Use instant sending instead of
> queueing of packets.")
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>>
> Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
> Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
> Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>
> Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
> ---
> v2->v3
>    * Refactor the code
>    * Use thread local copy 'send_port_cache' instead of 'tx_port' while draining
>    * Invoke dp_netdev_drain_txq_port() to drain the packets from the queue as
>      part of pmd reconfiguration that gets triggered due to port addition/deletion
>      or change in pmd-cpu-mask.
>    * Invoke netdev_txq_drain() from xps_get_tx_qid() to drain packets in old
>      queue. This is possible in XPS case where the tx queue can change after
>      timeout.
>    * Fix another bug in netdev_dpdk_eth_tx_burst() w.r.t 'txq->count'.
>
> Latency stats:
> Collected the latency stats with PHY2PHY loopback case using 30 IXIA streams
> /UDP packets/uni direction traffic. All the stats are in nanoseconds. Results
> below compare latency results between Master vs patch.
>
> case 1: Matching IP flow rules for each IXIA stream
>          Eg:  For an IXIA stream with src Ip: 2.2.2.1, dst tip: 5.5.5.1
>          ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=2.2.2.1,actions=output:2
>
>          For an IXIA stream with src Ip: 4.4.4.1, dst tip: 15.15.15.1
>          ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=4.4.4.1,actions=output:2
>
> Packet        64             128             256             512
> Branch	Master  Patch	Master  Patch	Master	Patch   Master  Patch	
> case 1  26100   222000  26190   217930  23890   199000  30370   212440 (min latency ns)
>          1239100 906910  1168740 691040  575470  574240  724360  734050 (max latency ns)
>          1189501	763908  913602  662941  486187  440482  470060  479376 (avg latency ns)
>
>   
>             1024              1280             1518
>         Master   Patch     Master Patch      Master Patch
>         28320    189610    26520  220580     23950  200480  (min latency ns)
>         701040   67584670  670390 19783490   685930 747040  (max latency ns)
>         444033   469297    415602 506215     429587 491593  (avg latency ns)
>
>
> case 2: ovs-ofctl add-flow br0 in_port=1,action=output:2
>
> Packet        64             128             256             512
> Branch	Master	Patch	Master	Patch	Master	Patch	Master	Patch	
> case 2  18800   33970   19980   30350  22610    26800   13500   20220
>          506140  596690  363010  363370  544520  541570  549120  77414700
>          459509  473536  254817  256801  287872  287277  290642  301572
>
>
>             1024              1280             1518
>         Master   Patch     Master Patch      Master Patch
>         22530    15850     21350  36020      25970  34300
>         549680   131964240 543390 81549210   552060 98207410
>         292436   294388    285468 305727     295133 300080
>
>
> case 3 is same as case 1 with INTERIM_QUEUE_BURST_THRESHOLD=16, instead of 32.
>
> (w) patch
> case 3    64       128       256    512    1024       1280    1518
>           122700   119890   135200  117530  118900     116640  123710(min)
>           972830   808960   574180  696820  36717550   720500  726790(max)
>           783315   674814   463256  439412  467041     463093  471967(avg)
>
> case 4 is same as case 2 with INTERIM_QUEUE_BURST_THRESHOLD=16, instead of 32.
>
> (w) patch
> case 4    64       128       256    512    1024       1280      1518
>           31750    26140    25250   17570   14750      28600     31460(min ns)
>           722690   363200   539760  538320  301845040  12556210  132114800(max ns)
>           485710   253497   285589  284095  293189     282834    285829(avg ns)
>
> v1->v2
>    * xps_get_tx_qid() is no more called twice. The last used qid is stored so
>      the drain function will flush the right queue also when XPS is enabled.
>    * netdev_txq_drain() is called unconditionally and not just for dpdk ports.
>    * txq_drain() takes the 'tx_lock' for queue in case of dynamic tx queues.
>    * Restored counting of dropped packets.
>    * Changed scheduling of drain function.
>    * Updated comments in netdev-provider.h
>    * Fixed a comment in dp-packet.h
>
> Details:
>   * In worst case scenario with fewer packets in batch, significant
>     bottleneck is observed at netdev_dpdk_eth_send() function due to
>     expensive MMIO writes.
>
>   * Also its observed that CPI(cycles per instruction) Rate for the function
>     stood between 3.15 and 4.1 which is significantly higher than acceptable
>     limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend
>     pipeline can retire 4 micro-operations in a cycle).
>
>   * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall
>     throughput improved significantly.
>
>   lib/dp-packet.h       |  2 +-
>   lib/dpif-netdev.c     | 66 ++++++++++++++++++++++++++++++++++++--
>   lib/netdev-bsd.c      |  1 +
>   lib/netdev-dpdk.c     | 87 ++++++++++++++++++++++++++++++++++++++++++++++-----
>   lib/netdev-dummy.c    |  1 +
>   lib/netdev-linux.c    |  1 +
>   lib/netdev-provider.h |  8 +++++
>   lib/netdev-vport.c    |  3 +-
>   lib/netdev.c          |  9 ++++++
>   lib/netdev.h          |  1 +
>   10 files changed, 166 insertions(+), 13 deletions(-)
>
> diff --git a/lib/dp-packet.h b/lib/dp-packet.h
> index 17b7026..9e3912a 100644
> --- a/lib/dp-packet.h
> +++ b/lib/dp-packet.h
> @@ -39,7 +39,7 @@ enum OVS_PACKED_ENUM dp_packet_source {
>       DPBUF_STACK,               /* Un-movable stack space or static buffer. */
>       DPBUF_STUB,                /* Starts on stack, may expand into heap. */
>       DPBUF_DPDK,                /* buffer data is from DPDK allocated memory.
> -                                * ref to build_dp_packet() in netdev-dpdk. */
> +                                * Ref dp_packet_init_dpdk() in dp-packet.c */
>   };
>   
>   #define DP_PACKET_CONTEXT_SIZE 64
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 719a518..b0d47fa 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -289,6 +289,8 @@ struct dp_netdev_rxq {
>       struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this queue. */
>   };
>   
> +#define LAST_USED_QID_NONE -1
> +
>   /* A port in a netdev-based datapath. */
>   struct dp_netdev_port {
>       odp_port_t port_no;
> @@ -437,8 +439,14 @@ struct rxq_poll {
>   struct tx_port {
>       struct dp_netdev_port *port;
>       int qid;
> -    long long last_used;
> +    long long last_used;        /* In case XPS is enabled, it contains the
> +				 * timestamp of the last time the port was
> +				 * used by the thread to send data.  After
> +				 * XPS_TIMEOUT_MS elapses the qid will be
> +				 * marked as -1. */
>       struct hmap_node node;
> +    int last_used_qid;          /* Last queue id where packets could be
> +                                   enqueued. */
>   };
>   
>   /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
> @@ -2900,6 +2908,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
>   }
>   
>   static void
> +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
> +{
> +    struct tx_port *cached_tx_port;
> +    int tx_qid;
> +
> +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
> +        tx_qid = cached_tx_port->last_used_qid;
> +
> +        if (tx_qid != LAST_USED_QID_NONE) {
> +            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
> +                         cached_tx_port->port->dynamic_txqs);
> +
> +            /* Queue drained and mark it empty. */
> +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
> +        }
> +    }
> +}
> +
> +static void
>   dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                              struct netdev_rxq *rx,
>                              odp_port_t port_no)
> @@ -3514,15 +3541,18 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
>       return i;
>   }
>   
> +enum { DRAIN_TSC = 20000ULL };
> +
>   static void *
>   pmd_thread_main(void *f_)
>   {
>       struct dp_netdev_pmd_thread *pmd = f_;
> -    unsigned int lc = 0;
> +    unsigned int lc = 0, lc_drain = 0;
>       struct polled_queue *poll_list;
>       bool exiting;
>       int poll_cnt;
>       int i;
> +    uint64_t prev = 0, now = 0;
>   
>       poll_list = NULL;
>   
> @@ -3555,6 +3585,17 @@ reload:
>                                          poll_list[i].port_no);
>           }
>   
> +#define MAX_LOOP_TO_DRAIN 128
Is defining this inline ok?
> +        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
> +            lc_drain = 0;
> +            prev = now;
> +            now = pmd->last_cycles;
> +
> +            if ((now - prev) > DRAIN_TSC) {
> +                dp_netdev_drain_txq_ports(pmd);
> +            }
> +        }
> +
>           if (lc++ > 1024) {
>               bool reload;
>   
> @@ -3573,6 +3614,9 @@ reload:
>           }
>       }
>   
> +    /* Drain the queues as part of reconfiguration */
> +    dp_netdev_drain_txq_ports(pmd);
> +
>       poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
>       exiting = latch_is_set(&pmd->exit_latch);
>       /* Signal here to make sure the pmd finishes
> @@ -3890,6 +3934,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>   
>       tx->port = port;
>       tx->qid = -1;
> +    tx->last_used_qid = LAST_USED_QID_NONE;
>   
>       hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>       pmd->need_reload = true;
> @@ -4454,6 +4499,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>   
>       dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>   
> +    /* The tx queue can change in XPS case, make sure packets in previous
> +     * queue is drained properly. */
> +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
> +               tx->qid != tx->last_used_qid) {
> +        netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs);
> +        tx->last_used_qid = LAST_USED_QID_NONE;
> +    }
> +
>       VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>                pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>       return min_qid;
> @@ -4548,6 +4601,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
>                   tx_qid = pmd->static_tx_qid;
>               }
>   
> +            /* In case these packets gets buffered into an intermediate
> +             * queue and XPS is enabled the drain function could find a
> +             * different Tx qid assigned to its thread.  We keep track
> +             * of the qid we're now using, that will trigger the drain
> +             * function and will select the right queue to flush. */
> +            p->last_used_qid = tx_qid;
> +
Is there a reason for not doing this inside netdev_send()?
>               netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>                           dynamic_txqs);
>               return;
> @@ -4960,7 +5020,7 @@ dpif_dummy_register(enum dummy_level level)
>                                "dp port new-number",
>                                3, 3, dpif_dummy_change_port_number, NULL);
>   }
> -
> +
>   /* Datapath Classifier. */
>   
>   /* A set of rules that all have the same fields wildcarded. */
> diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> index 94c515d..00d5263 100644
> --- a/lib/netdev-bsd.c
> +++ b/lib/netdev-bsd.c
> @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
>       netdev_bsd_rxq_recv,                             \
>       netdev_bsd_rxq_wait,                             \
>       netdev_bsd_rxq_drain,                            \
> +    NULL,                                            \
>   }
>   
>   const struct netdev_class netdev_bsd_class =
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 94568a1..3def755 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = {
>   
>   enum { DPDK_RING_SIZE = 256 };
>   BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
> -enum { DRAIN_TSC = 200000ULL };
>   
>   enum dpdk_dev_type {
>       DPDK_DEV_ETH = 0,
> @@ -286,15 +285,26 @@ struct dpdk_mp {
>       struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
>   };
>   
> +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
> + * Defaults to 'NETDEV_MAX_BURST'(32) now.
> + */
> +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
> +
>   /* There should be one 'struct dpdk_tx_queue' created for
>    * each cpu core. */
>   struct dpdk_tx_queue {
> +    int count;                     /* Number of buffered packets waiting to
> +                                      be sent. */
>       rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
>                                       * from concurrent access.  It is used only
>                                       * if the queue is shared among different
>                                       * pmd threads (see 'concurrent_txq'). */
>       int map;                       /* Mapping of configured vhost-user queues
>                                       * to enabled by guest. */
> +    struct rte_mbuf *burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
> +                                   /* Intermediate queues where packets can
> +                                    * be buffered to amortize the cost of MMIO
> +                                    * writes. */
>   };
>   
>   /* dpdk has no way to remove dpdk ring ethernet devices
> @@ -1381,6 +1391,7 @@ static inline int
>   netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>                            struct rte_mbuf **pkts, int cnt)
>   {
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>       uint32_t nb_tx = 0;
>   
>       while (nb_tx != cnt) {
> @@ -1404,6 +1415,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>           }
>       }
>   
> +    txq->count = 0;
>       return cnt - nb_tx;
>   }
>   
> @@ -1788,12 +1800,42 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>       }
>   }
>   
> +/* Enqueue packets in an intermediate queue and call the burst
> + * function when the queue is full.  This way we can amortize the
> + * cost of MMIO writes. */
> +static inline int
> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
> +                            struct rte_mbuf **pkts, int cnt)
> +{
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> +    int i = 0;
> +    int dropped = 0;
> +
> +    while (i < cnt) {
> +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->count;
> +        int tocopy = MIN(freeslots, cnt-i);
> +
> +        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
> +               tocopy * sizeof (struct rte_mbuf *));
> +
> +        txq->count += tocopy;
> +        i += tocopy;
> +
> +        /* Queue full, burst the packets */
> +        if (txq->count >= INTERIM_QUEUE_BURST_THRESHOLD) {
> +           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
> +                   txq->count);
> +        }
> +    }
> +    return dropped;
> +}
> +
>   static int
>   netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                          struct dp_packet_batch *batch,
>                          bool may_steal, bool concurrent_txq OVS_UNUSED)
>   {
> -
>       if (OVS_UNLIKELY(!may_steal || batch->packets[0]->source != DPBUF_DPDK)) {
>           dpdk_do_tx_copy(netdev, qid, batch);
>           dp_packet_delete_batch(batch, may_steal);
> @@ -1836,7 +1878,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>           cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
>           dropped = batch->count - cnt;
>   
> -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
> +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
>   
>           if (OVS_UNLIKELY(dropped)) {
>               rte_spinlock_lock(&dev->stats_lock);
> @@ -1850,6 +1892,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>       }
>   }
>   
> +/* Drain tx queues, this is called periodically to empty the
> + * intermediate queue in case of few packets (< INTERIM_QUEUE_BURST_THRESHOLD)
> + * are buffered into the queue. */
> +static int
> +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool concurrent_txq)
> +{
> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> +    if (OVS_LIKELY(txq->count)) {
> +        if (OVS_UNLIKELY(concurrent_txq)) {
> +            qid = qid % dev->up.n_txq;
> +            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> +        }
> +
> +        netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq->count);
> +
> +        if (OVS_UNLIKELY(concurrent_txq)) {
> +            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> +        }
> +    }
> +    return 0;
> +}
> +
>   static int
>   netdev_dpdk_eth_send(struct netdev *netdev, int qid,
>                        struct dp_packet_batch *batch, bool may_steal,
> @@ -3243,7 +3309,7 @@ unlock:
>                             SET_CONFIG, SET_TX_MULTIQ, SEND,    \
>                             GET_CARRIER, GET_STATS,             \
>                             GET_FEATURES, GET_STATUS,           \
> -                          RECONFIGURE, RXQ_RECV)              \
> +                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
>   {                                                             \
>       NAME,                                                     \
>       true,                       /* is_pmd */                  \
> @@ -3310,6 +3376,7 @@ unlock:
>       RXQ_RECV,                                                 \
>       NULL,                       /* rx_wait */                 \
>       NULL,                       /* rxq_drain */               \
> +    TXQ_DRAIN,      /* txq_drain */                           \
>   }
>   
>   static const struct netdev_class dpdk_class =
> @@ -3326,7 +3393,8 @@ static const struct netdev_class dpdk_class =
>           netdev_dpdk_get_features,
>           netdev_dpdk_get_status,
>           netdev_dpdk_reconfigure,
> -        netdev_dpdk_rxq_recv);
> +        netdev_dpdk_rxq_recv,
> +        netdev_dpdk_txq_drain);
>   
>   static const struct netdev_class dpdk_ring_class =
>       NETDEV_DPDK_CLASS(
> @@ -3342,7 +3410,8 @@ static const struct netdev_class dpdk_ring_class =
>           netdev_dpdk_get_features,
>           netdev_dpdk_get_status,
>           netdev_dpdk_reconfigure,
> -        netdev_dpdk_rxq_recv);
> +        netdev_dpdk_rxq_recv,
> +        NULL);
>   
>   static const struct netdev_class dpdk_vhost_class =
>       NETDEV_DPDK_CLASS(
> @@ -3358,7 +3427,8 @@ static const struct netdev_class dpdk_vhost_class =
>           NULL,
>           NULL,
>           netdev_dpdk_vhost_reconfigure,
> -        netdev_dpdk_vhost_rxq_recv);
> +        netdev_dpdk_vhost_rxq_recv,
> +        NULL);
We need this patch even more in the vhost case as there is an even 
bigger drop
in performance when we exceed the rx batch size. I measured around 40%, when
reducing the rx batch size to 4, and using 1 vs 5 flows (single PMD).

>   static const struct netdev_class dpdk_vhost_client_class =
>       NETDEV_DPDK_CLASS(
>           "dpdkvhostuserclient",
> @@ -3373,7 +3443,8 @@ static const struct netdev_class dpdk_vhost_client_class =
>           NULL,
>           NULL,
>           netdev_dpdk_vhost_client_reconfigure,
> -        netdev_dpdk_vhost_rxq_recv);
> +        netdev_dpdk_vhost_rxq_recv,
> +        NULL);
>   
>   void
>   netdev_dpdk_register(void)
> diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> index 0657434..4ef659e 100644
> --- a/lib/netdev-dummy.c
> +++ b/lib/netdev-dummy.c
> @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
>       netdev_dummy_rxq_recv,                                      \
>       netdev_dummy_rxq_wait,                                      \
>       netdev_dummy_rxq_drain,                                     \
> +    NULL,                                                       \
>   }
>   
>   static const struct netdev_class dummy_class =
> diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> index 9ff1333..79478ee 100644
> --- a/lib/netdev-linux.c
> +++ b/lib/netdev-linux.c
> @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
>       netdev_linux_rxq_recv,                                      \
>       netdev_linux_rxq_wait,                                      \
>       netdev_linux_rxq_drain,                                     \
> +    NULL,                                                       \
>   }
>   
>   const struct netdev_class netdev_linux_class =
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index 8346fc4..97e72c6 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -335,6 +335,11 @@ struct netdev_class {
>        * If the function returns a non-zero value, some of the packets might have
>        * been sent anyway.
>        *
> +     * Some netdev provider - like in case of 'dpdk' - may buffer the batch
> +     * of packets into an intermediate queue.  Buffered packets will be sent
> +     * out when their number will exceed a threshold or by the periodic call
> +     * to the drain function.
> +     *
>        * If 'may_steal' is false, the caller retains ownership of all the
>        * packets.  If 'may_steal' is true, the caller transfers ownership of all
>        * the packets to the network device, regardless of success.
> @@ -769,6 +774,9 @@ struct netdev_class {
>   
>       /* Discards all packets waiting to be received from 'rx'. */
>       int (*rxq_drain)(struct netdev_rxq *rx);
> +
> +    /* Drain all packets waiting to be sent on queue 'qid'. */
> +    int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
>   };
>   
>   int netdev_register_provider(const struct netdev_class *);
> diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
> index 2d0aa43..64cf617 100644
> --- a/lib/netdev-vport.c
> +++ b/lib/netdev-vport.c
> @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats)
>       NULL,                   /* rx_dealloc */                \
>       NULL,                   /* rx_recv */                   \
>       NULL,                   /* rx_wait */                   \
> -    NULL,                   /* rx_drain */
> +    NULL,                   /* rx_drain */                  \
> +    NULL,                   /* tx_drain */
>   
>   
>   #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER)   \
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 1e6bb2b..5e0c53f 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
>               : 0);
>   }
>   
> +/* Flush packets on the queue 'qid'. */
> +int
> +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
> +{
> +    return (netdev->netdev_class->txq_drain
> +            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
> +            : EOPNOTSUPP);
> +}
> +
>   /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
>    * otherwise a positive errno value.
>    *
> diff --git a/lib/netdev.h b/lib/netdev.h
> index d6c07c1..7ddd790 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
>   int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
>                   bool may_steal, bool concurrent_txq);
>   void netdev_send_wait(struct netdev *, int qid);
> +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
>   
>   /* native tunnel APIs */
>   /* Structure to pass parameters required to build a tunnel header. */
Bodireddy, Bhanuprakash April 18, 2017, 9:44 p.m. UTC | #3
Hi Eelco,

Please find my comments inline. 

>
>Hi Bhanuprakash,
>
>I was doing some Physical to Virtual tests, and whenever the number of flows
>reaches the rx batch size performance dropped a lot. I created an
>experimental patch where I added an intermediate queue and flush it at the
>end of the rx batch.
>
>When I found your patch I decided to give it try to see how it behaves.
>I also modified you patch in such a way that it will flush the queue after every
>call to dp_netdev_process_rxq_port().

I presume you were doing like below in the pmd_thread_main receive loop?

for (i = 0; i < poll_cnt; i++) {
            dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
                                       poll_list[i].port_no);
            dp_netdev_drain_txq_ports(pmd);
        }

>
>Here are some pkt forwarding stats for the Physical to Physical scenario, for
>two 82599ES 10G port with 64 byte packets being send at wire speed:
>
>Number      plain                patch +
>of flows  git clone    patch      flush
>========  =========  =========  =========
>   10       10727283   13527752   13393844
>   32        7042253   11285572     11228799
>   50        7515491    9642650      9607791
>  100        5838699    9461239      9430730
>  500        5285066    7859123      7845807
>1000        5226477    7146404      7135601

Thanks for sharing the numbers, I do agree with your findings and I saw very similar results with our v3 patch.
In any case we see significant throughput improvement with the patch.

>
>
>I do not have an IXIA to do the latency tests you performed, however I do
>have a XENA tester which has a basic latency measurement feature.
>I used the following script to get the latency numbers:
>
>https://github.com/chaudron/XenaPythonLib/blob/latency/examples/latenc
>y.py

Thanks for pointing this, it could be useful for users with no IXIA setup.

>
>
>As you can see in the numbers below, the default queue introduces quite
>some latency, however doing the flush every rx batch brings the latency down
>to almost the original values. The results mimics your test case 2, sending 10G
>traffic @ wire speed:
>
>   ===== GIT CLONE
>   Pkt size  min(ns)  avg(ns)  max(ns)
>    512      4,631      5,022    309,914
>   1024      5,545      5,749    104,294
>   1280      5,978      6,159     45,306
>   1518      6,419      6,774    946,850
>
>   ===== PATCH
>   Pkt size  min(ns)  avg(ns)  max(ns)
>    512      4,928    492,228  1,995,026
>   1024      5,761    499,206  2,006,628
>   1280      6,186    497,975  1,986,175
>   1518      6,579    494,434  2,005,947
>
>   ===== PATCH + FLUSH
>   Pkt size  min(ns)  avg(ns)  max(ns)
>    512      4,711      5,064    182,477
>   1024      5,601      5,888    701,654
>   1280      6,018      6,491    533,037
>   1518      6,467      6,734    312,471

The latency numbers above are very encouraging indeed. However with RFC2544 tests especially on IXIA, we do have lot of parameters to tune.
I see that the latency stats fluctuate a lot with change in acceptable 'Frame Loss'.  I am not expert of IXIA myself, but trying to figure out acceptable
settings and trying to measure latency/throughput. 

>
>Maybe it will be good to re-run your latency tests with the flush for every rx
>batch. This might get ride of your huge latency while still increasing the
>performance in the case the rx batch shares the same egress port.
>
>The overall patchset looks fine to me, see some comments inline.
Thanks for reviewing the patch.

>>
>> +#define MAX_LOOP_TO_DRAIN 128
>Is defining this inline ok?
I see that this convention is used in ovs. 

>>           NULL,
>>           NULL,
>>           netdev_dpdk_vhost_reconfigure,
>> -        netdev_dpdk_vhost_rxq_recv);
>> +        netdev_dpdk_vhost_rxq_recv,
>> +        NULL);
>We need this patch even more in the vhost case as there is an even bigger
>drop in performance when we exceed the rx batch size. I measured around
>40%, when reducing the rx batch size to 4, and using 1 vs 5 flows (single PMD).

Completely Agree. Infact we did a quick patch doing batching for vhost ports as well and found significant performance improvement(though it's not thoroughly tested for all corner cases).
We have that in our backlog and we will trying posting that patch as an RFC atleast to get feedback from the community.

-Bhanuprakash.
Eelco Chaudron April 19, 2017, 1:06 p.m. UTC | #4
On 18/04/17 23:44, Bodireddy, Bhanuprakash wrote:
>> Hi Bhanuprakash,
>>
>> I was doing some Physical to Virtual tests, and whenever the number of flows
>> reaches the rx batch size performance dropped a lot. I created an
>> experimental patch where I added an intermediate queue and flush it at the
>> end of the rx batch.
>>
>> When I found your patch I decided to give it try to see how it behaves.
>> I also modified you patch in such a way that it will flush the queue after every
>> call to dp_netdev_process_rxq_port().
> I presume you were doing like below in the pmd_thread_main receive loop?
>
> for (i = 0; i < poll_cnt; i++) {
>              dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
>                                         poll_list[i].port_no);
>              dp_netdev_drain_txq_ports(pmd);
>          }
Yes this is exactly what I did. It would be interesting to see what IXIA 
thinks
of this change ;)
>> Here are some pkt forwarding stats for the Physical to Physical scenario, for
>> two 82599ES 10G port with 64 byte packets being send at wire speed:
>>
>> Number      plain                patch +
>> of flows  git clone    patch      flush
>> ========  =========  =========  =========
>>    10       10727283   13527752   13393844
>>    32        7042253   11285572     11228799
>>    50        7515491    9642650      9607791
>>   100        5838699    9461239      9430730
>>   500        5285066    7859123      7845807
>> 1000        5226477    7146404      7135601
> Thanks for sharing the numbers, I do agree with your findings and I saw very similar results with our v3 patch.
> In any case we see significant throughput improvement with the patch.
>
>>
>> I do not have an IXIA to do the latency tests you performed, however I do
>> have a XENA tester which has a basic latency measurement feature.
>> I used the following script to get the latency numbers:
>>
>> https://github.com/chaudron/XenaPythonLib/blob/latency/examples/latenc
>> y.py
> Thanks for pointing this, it could be useful for users with no IXIA setup.
>
>>
>> As you can see in the numbers below, the default queue introduces quite
>> some latency, however doing the flush every rx batch brings the latency down
>> to almost the original values. The results mimics your test case 2, sending 10G
>> traffic @ wire speed:
>>
>>    ===== GIT CLONE
>>    Pkt size  min(ns)  avg(ns)  max(ns)
>>     512      4,631      5,022    309,914
>>    1024      5,545      5,749    104,294
>>    1280      5,978      6,159     45,306
>>    1518      6,419      6,774    946,850
>>
>>    ===== PATCH
>>    Pkt size  min(ns)  avg(ns)  max(ns)
>>     512      4,928    492,228  1,995,026
>>    1024      5,761    499,206  2,006,628
>>    1280      6,186    497,975  1,986,175
>>    1518      6,579    494,434  2,005,947
>>
>>    ===== PATCH + FLUSH
>>    Pkt size  min(ns)  avg(ns)  max(ns)
>>     512      4,711      5,064    182,477
>>    1024      5,601      5,888    701,654
>>    1280      6,018      6,491    533,037
>>    1518      6,467      6,734    312,471
> The latency numbers above are very encouraging indeed. However with RFC2544 tests especially on IXIA, we do have lot of parameters to tune.
> I see that the latency stats fluctuate a lot with change in acceptable 'Frame Loss'.  I am not expert of IXIA myself, but trying to figure out acceptable
> settings and trying to measure latency/throughput.
I just figured out that XENA also has the RFC2544 tests, and I decided 
to give it a shot.
I also noticed that if packets get dropped the results get really off. 
In the end I did
the tests with 99% wire speed @10G no packets got lost and the results 
are stable.
Here are the results for test 2, 30 flows, 512 byte packets:

          Avg     Min     Max
PLAIN    15.397   5.288  880.598
PATCH    28.521  11.358  925.001
FLUSH    15.958   5.352  917.889

>> Maybe it will be good to re-run your latency tests with the flush for every rx
>> batch. This might get ride of your huge latency while still increasing the
>> performance in the case the rx batch shares the same egress port.
>>
>> The overall patchset looks fine to me, see some comments inline.
> Thanks for reviewing the patch.
>
>>> +#define MAX_LOOP_TO_DRAIN 128
>> Is defining this inline ok?
> I see that this convention is used in ovs.
>
>>>            NULL,
>>>            NULL,
>>>            netdev_dpdk_vhost_reconfigure,
>>> -        netdev_dpdk_vhost_rxq_recv);
>>> +        netdev_dpdk_vhost_rxq_recv,
>>> +        NULL);
>> We need this patch even more in the vhost case as there is an even bigger
>> drop in performance when we exceed the rx batch size. I measured around
>> 40%, when reducing the rx batch size to 4, and using 1 vs 5 flows (single PMD).
> Completely Agree. Infact we did a quick patch doing batching for vhost ports as well and found significant performance improvement(though it's not thoroughly tested for all corner cases).
> We have that in our backlog and we will trying posting that patch as an RFC atleast to get feedback from the community.
Thanks! looking forward to it. Will definitely review, and test it!
> -Bhanuprakash.
Ciara Loftus May 26, 2017, 2:50 p.m. UTC | #5
> 
> After packet classification, packets are queued in to batches depending
> on the matching netdev flow. Thereafter each batch is processed to
> execute the related actions. This becomes particularly inefficient if
> there are few packets in each batch as rte_eth_tx_burst() incurs expensive
> MMIO writes.
> 
> This commit adds back intermediate queue implementation. Packets are
> queued and burst when the packet count exceeds threshold. Also drain
> logic is refactored to handle packets hanging in the tx queues. Testing
> shows significant performance gains with this implementation.
> 
> Fixes: b59cc14e032d("netdev-dpdk: Use instant sending instead of
> queueing of packets.")
> Signed-off-by: Bhanuprakash Bodireddy
> <bhanuprakash.bodireddy@intel.com>>
> Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
> Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
> Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>
> Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
> ---
> v2->v3
>   * Refactor the code
>   * Use thread local copy 'send_port_cache' instead of 'tx_port' while draining
>   * Invoke dp_netdev_drain_txq_port() to drain the packets from the queue
> as
>     part of pmd reconfiguration that gets triggered due to port
> addition/deletion
>     or change in pmd-cpu-mask.
>   * Invoke netdev_txq_drain() from xps_get_tx_qid() to drain packets in old
>     queue. This is possible in XPS case where the tx queue can change after
>     timeout.
>   * Fix another bug in netdev_dpdk_eth_tx_burst() w.r.t 'txq->count'.
> 
> Latency stats:
> Collected the latency stats with PHY2PHY loopback case using 30 IXIA streams
> /UDP packets/uni direction traffic. All the stats are in nanoseconds. Results
> below compare latency results between Master vs patch.
> 
> case 1: Matching IP flow rules for each IXIA stream
>         Eg:  For an IXIA stream with src Ip: 2.2.2.1, dst tip: 5.5.5.1
>         ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=2.2.2.1,actions=output:2
> 
>         For an IXIA stream with src Ip: 4.4.4.1, dst tip: 15.15.15.1
>         ovs-ofctl add-flow br0 dl_type=0x0800,nw_src=4.4.4.1,actions=output:2
> 
> Packet        64             128             256             512
> Branch	Master  Patch	Master  Patch	Master	Patch   Master  Patch
> case 1  26100   222000  26190   217930  23890   199000  30370   212440 (min
> latency ns)
>         1239100 906910  1168740 691040  575470  574240  724360  734050 (max
> latency ns)
>         1189501	763908  913602  662941  486187  440482  470060  479376 (avg
> latency ns)
> 
> 
>            1024              1280             1518
>        Master   Patch     Master Patch      Master Patch
>        28320    189610    26520  220580     23950  200480  (min latency ns)
>        701040   67584670  670390 19783490   685930 747040  (max latency ns)
>        444033   469297    415602 506215     429587 491593  (avg latency ns)
> 
> 
> case 2: ovs-ofctl add-flow br0 in_port=1,action=output:2
> 
> Packet        64             128             256             512
> Branch	Master	Patch	Master	Patch	Master	Patch	Master	Patch
> case 2  18800   33970   19980   30350  22610    26800   13500   20220
>         506140  596690  363010  363370  544520  541570  549120  77414700
>         459509  473536  254817  256801  287872  287277  290642  301572
> 
> 
>            1024              1280             1518
>        Master   Patch     Master Patch      Master Patch
>        22530    15850     21350  36020      25970  34300
>        549680   131964240 543390 81549210   552060 98207410
>        292436   294388    285468 305727     295133 300080
> 
> 
> case 3 is same as case 1 with INTERIM_QUEUE_BURST_THRESHOLD=16,
> instead of 32.
> 
> (w) patch
> case 3    64       128       256    512    1024       1280    1518
>          122700   119890   135200  117530  118900     116640  123710(min)
>          972830   808960   574180  696820  36717550   720500  726790(max)
>          783315   674814   463256  439412  467041     463093  471967(avg)
> 
> case 4 is same as case 2 with INTERIM_QUEUE_BURST_THRESHOLD=16,
> instead of 32.
> 
> (w) patch
> case 4    64       128       256    512    1024       1280      1518
>          31750    26140    25250   17570   14750      28600     31460(min ns)
>          722690   363200   539760  538320  301845040  12556210  132114800(max
> ns)
>          485710   253497   285589  284095  293189     282834    285829(avg ns)
> 
> v1->v2
>   * xps_get_tx_qid() is no more called twice. The last used qid is stored so
>     the drain function will flush the right queue also when XPS is enabled.
>   * netdev_txq_drain() is called unconditionally and not just for dpdk ports.
>   * txq_drain() takes the 'tx_lock' for queue in case of dynamic tx queues.
>   * Restored counting of dropped packets.
>   * Changed scheduling of drain function.
>   * Updated comments in netdev-provider.h
>   * Fixed a comment in dp-packet.h
> 
> Details:
>  * In worst case scenario with fewer packets in batch, significant
>    bottleneck is observed at netdev_dpdk_eth_send() function due to
>    expensive MMIO writes.
> 
>  * Also its observed that CPI(cycles per instruction) Rate for the function
>    stood between 3.15 and 4.1 which is significantly higher than acceptable
>    limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend
>    pipeline can retire 4 micro-operations in a cycle).
> 
>  * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall
>    throughput improved significantly.
> 
>  lib/dp-packet.h       |  2 +-
>  lib/dpif-netdev.c     | 66 ++++++++++++++++++++++++++++++++++++--
>  lib/netdev-bsd.c      |  1 +
>  lib/netdev-dpdk.c     | 87
> ++++++++++++++++++++++++++++++++++++++++++++++-----
>  lib/netdev-dummy.c    |  1 +
>  lib/netdev-linux.c    |  1 +
>  lib/netdev-provider.h |  8 +++++
>  lib/netdev-vport.c    |  3 +-
>  lib/netdev.c          |  9 ++++++
>  lib/netdev.h          |  1 +
>  10 files changed, 166 insertions(+), 13 deletions(-)
> 
> diff --git a/lib/dp-packet.h b/lib/dp-packet.h
> index 17b7026..9e3912a 100644
> --- a/lib/dp-packet.h
> +++ b/lib/dp-packet.h
> @@ -39,7 +39,7 @@ enum OVS_PACKED_ENUM dp_packet_source {
>      DPBUF_STACK,               /* Un-movable stack space or static buffer. */
>      DPBUF_STUB,                /* Starts on stack, may expand into heap. */
>      DPBUF_DPDK,                /* buffer data is from DPDK allocated memory.
> -                                * ref to build_dp_packet() in netdev-dpdk. */
> +                                * Ref dp_packet_init_dpdk() in dp-packet.c */
>  };
> 
>  #define DP_PACKET_CONTEXT_SIZE 64
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 719a518..b0d47fa 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -289,6 +289,8 @@ struct dp_netdev_rxq {
>      struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this
> queue. */
>  };
> 
> +#define LAST_USED_QID_NONE -1
> +
>  /* A port in a netdev-based datapath. */
>  struct dp_netdev_port {
>      odp_port_t port_no;
> @@ -437,8 +439,14 @@ struct rxq_poll {
>  struct tx_port {
>      struct dp_netdev_port *port;
>      int qid;
> -    long long last_used;
> +    long long last_used;        /* In case XPS is enabled, it contains the
> +				 * timestamp of the last time the port was
> +				 * used by the thread to send data.  After
> +				 * XPS_TIMEOUT_MS elapses the qid will be
> +				 * marked as -1. */

Replace tabs with spaces in the comment above.

>      struct hmap_node node;
> +    int last_used_qid;          /* Last queue id where packets could be
> +                                   enqueued. */
>  };
> 
>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
> @@ -2900,6 +2908,25 @@ cycles_count_end(struct dp_netdev_pmd_thread
> *pmd,
>  }
> 
>  static void
> +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
> +{
> +    struct tx_port *cached_tx_port;
> +    int tx_qid;
> +
> +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
> +        tx_qid = cached_tx_port->last_used_qid;
> +
> +        if (tx_qid != LAST_USED_QID_NONE) {
> +            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
> +                         cached_tx_port->port->dynamic_txqs);
> +
> +            /* Queue drained and mark it empty. */
> +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
> +        }
> +    }
> +}
> +
> +static void
>  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                             struct netdev_rxq *rx,
>                             odp_port_t port_no)
> @@ -3514,15 +3541,18 @@ pmd_load_queues_and_ports(struct
> dp_netdev_pmd_thread *pmd,
>      return i;
>  }
> 
> +enum { DRAIN_TSC = 20000ULL };
> +
>  static void *
>  pmd_thread_main(void *f_)
>  {
>      struct dp_netdev_pmd_thread *pmd = f_;
> -    unsigned int lc = 0;
> +    unsigned int lc = 0, lc_drain = 0;
>      struct polled_queue *poll_list;
>      bool exiting;
>      int poll_cnt;
>      int i;
> +    uint64_t prev = 0, now = 0;
> 
>      poll_list = NULL;
> 
> @@ -3555,6 +3585,17 @@ reload:
>                                         poll_list[i].port_no);
>          }
> 
> +#define MAX_LOOP_TO_DRAIN 128

MAX_LOOP_TO_DRAIN and DRAIN_TSC should be moved to somewhere appropriate, perhaps towards the beginning of the file. And should be annotated to inform what their values mean.

Is there a need for both or can the desired behaviour be achieved using a single value? Maybe not, just something to consider to reduce the complexity.

> +        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
> +            lc_drain = 0;
> +            prev = now;
> +            now = pmd->last_cycles;
> +
> +            if ((now - prev) > DRAIN_TSC) {
> +                dp_netdev_drain_txq_ports(pmd);
> +            }
> +        }
> +
>          if (lc++ > 1024) {
>              bool reload;
> 
> @@ -3573,6 +3614,9 @@ reload:
>          }
>      }
> 
> +    /* Drain the queues as part of reconfiguration */
> +    dp_netdev_drain_txq_ports(pmd);
> +
>      poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
>      exiting = latch_is_set(&pmd->exit_latch);
>      /* Signal here to make sure the pmd finishes
> @@ -3890,6 +3934,7 @@ dp_netdev_add_port_tx_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
> 
>      tx->port = port;
>      tx->qid = -1;
> +    tx->last_used_qid = LAST_USED_QID_NONE;
> 
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
> >port_no));
>      pmd->need_reload = true;
> @@ -4454,6 +4499,14 @@ dpif_netdev_xps_get_tx_qid(const struct
> dp_netdev_pmd_thread *pmd,
> 
>      dpif_netdev_xps_revalidate_pmd(pmd, now, false);
> 
> +    /* The tx queue can change in XPS case, make sure packets in previous
> +     * queue is drained properly. */
> +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
> +               tx->qid != tx->last_used_qid) {
> +        netdev_txq_drain(port->netdev, tx->last_used_qid, port-
> >dynamic_txqs);
> +        tx->last_used_qid = LAST_USED_QID_NONE;
> +    }
> +
>      VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>               pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>      return min_qid;
> @@ -4548,6 +4601,13 @@ dp_execute_cb(void *aux_, struct
> dp_packet_batch *packets_,
>                  tx_qid = pmd->static_tx_qid;
>              }
> 
> +            /* In case these packets gets buffered into an intermediate
> +             * queue and XPS is enabled the drain function could find a
> +             * different Tx qid assigned to its thread.  We keep track
> +             * of the qid we're now using, that will trigger the drain
> +             * function and will select the right queue to flush. */
> +            p->last_used_qid = tx_qid;
> +
>              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>                          dynamic_txqs);
>              return;
> @@ -4960,7 +5020,7 @@ dpif_dummy_register(enum dummy_level level)
>                               "dp port new-number",
>                               3, 3, dpif_dummy_change_port_number, NULL);
>  }
> -

Remove the character inserted here.

> +
>  /* Datapath Classifier. */
> 
>  /* A set of rules that all have the same fields wildcarded. */
> diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> index 94c515d..00d5263 100644
> --- a/lib/netdev-bsd.c
> +++ b/lib/netdev-bsd.c
> @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev
> *netdev_, enum netdev_flags off,
>      netdev_bsd_rxq_recv,                             \
>      netdev_bsd_rxq_wait,                             \
>      netdev_bsd_rxq_drain,                            \
> +    NULL,                                            \
>  }
> 
>  const struct netdev_class netdev_bsd_class =
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 94568a1..3def755 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = {
> 
>  enum { DPDK_RING_SIZE = 256 };
>  BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
> -enum { DRAIN_TSC = 200000ULL };
> 
>  enum dpdk_dev_type {
>      DPDK_DEV_ETH = 0,
> @@ -286,15 +285,26 @@ struct dpdk_mp {
>      struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
>  };
> 
> +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before
> tranmitting.
> + * Defaults to 'NETDEV_MAX_BURST'(32) now.
> + */
> +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
> +
>  /* There should be one 'struct dpdk_tx_queue' created for
>   * each cpu core. */
>  struct dpdk_tx_queue {
> +    int count;                     /* Number of buffered packets waiting to
> +                                      be sent. */
>      rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
>                                      * from concurrent access.  It is used only
>                                      * if the queue is shared among different
>                                      * pmd threads (see 'concurrent_txq'). */
>      int map;                       /* Mapping of configured vhost-user queues
>                                      * to enabled by guest. */
> +    struct rte_mbuf *burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
> +                                   /* Intermediate queues where packets can
> +                                    * be buffered to amortize the cost of MMIO
> +                                    * writes. */
>  };
> 
>  /* dpdk has no way to remove dpdk ring ethernet devices
> @@ -1381,6 +1391,7 @@ static inline int
>  netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>                           struct rte_mbuf **pkts, int cnt)
>  {
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>      uint32_t nb_tx = 0;
> 
>      while (nb_tx != cnt) {
> @@ -1404,6 +1415,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk
> *dev, int qid,
>          }
>      }
> 
> +    txq->count = 0;
>      return cnt - nb_tx;
>  }
> 
> @@ -1788,12 +1800,42 @@ dpdk_do_tx_copy(struct netdev *netdev, int
> qid, struct dp_packet_batch *batch)
>      }
>  }
> 
> +/* Enqueue packets in an intermediate queue and call the burst
> + * function when the queue is full.  This way we can amortize the
> + * cost of MMIO writes. */
> +static inline int
> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
> +                            struct rte_mbuf **pkts, int cnt)
> +{
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +

Remove this whitespace.

> +    int i = 0;
> +    int dropped = 0;
> +
> +    while (i < cnt) {
> +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->count;
> +        int tocopy = MIN(freeslots, cnt-i);
> +
> +        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
> +               tocopy * sizeof (struct rte_mbuf *));
> +
> +        txq->count += tocopy;
> +        i += tocopy;
> +
> +        /* Queue full, burst the packets */
> +        if (txq->count >= INTERIM_QUEUE_BURST_THRESHOLD) {
> +           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
> +                   txq->count);
> +        }
> +    }
> +    return dropped;
> +}
> +
>  static int
>  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                         struct dp_packet_batch *batch,
>                         bool may_steal, bool concurrent_txq OVS_UNUSED)
>  {
> -
>      if (OVS_UNLIKELY(!may_steal || batch->packets[0]->source !=
> DPBUF_DPDK)) {
>          dpdk_do_tx_copy(netdev, qid, batch);
>          dp_packet_delete_batch(batch, may_steal);
> @@ -1836,7 +1878,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev,
> int qid,
>          cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
>          dropped = batch->count - cnt;
> 
> -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
> +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
> 
>          if (OVS_UNLIKELY(dropped)) {
>              rte_spinlock_lock(&dev->stats_lock);
> @@ -1850,6 +1892,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev,
> int qid,
>      }
>  }
> 
> +/* Drain tx queues, this is called periodically to empty the
> + * intermediate queue in case of few packets (<
> INTERIM_QUEUE_BURST_THRESHOLD)
> + * are buffered into the queue. */
> +static int
> +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool
> concurrent_txq)
> +{
> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> +    if (OVS_LIKELY(txq->count)) {
> +        if (OVS_UNLIKELY(concurrent_txq)) {
> +            qid = qid % dev->up.n_txq;
> +            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> +        }
> +
> +        netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq->count);
> +
> +        if (OVS_UNLIKELY(concurrent_txq)) {
> +            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> +        }
> +    }
> +    return 0;
> +}
> +
>  static int
>  netdev_dpdk_eth_send(struct netdev *netdev, int qid,
>                       struct dp_packet_batch *batch, bool may_steal,
> @@ -3243,7 +3309,7 @@ unlock:
>                            SET_CONFIG, SET_TX_MULTIQ, SEND,    \
>                            GET_CARRIER, GET_STATS,             \
>                            GET_FEATURES, GET_STATUS,           \
> -                          RECONFIGURE, RXQ_RECV)              \
> +                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
>  {                                                             \
>      NAME,                                                     \
>      true,                       /* is_pmd */                  \
> @@ -3310,6 +3376,7 @@ unlock:
>      RXQ_RECV,                                                 \
>      NULL,                       /* rx_wait */                 \
>      NULL,                       /* rxq_drain */               \
> +    TXQ_DRAIN,      /* txq_drain */                           \

Align this comment with the rest.

>  }
> 
>  static const struct netdev_class dpdk_class =
> @@ -3326,7 +3393,8 @@ static const struct netdev_class dpdk_class =
>          netdev_dpdk_get_features,
>          netdev_dpdk_get_status,
>          netdev_dpdk_reconfigure,
> -        netdev_dpdk_rxq_recv);
> +        netdev_dpdk_rxq_recv,
> +        netdev_dpdk_txq_drain);
> 
>  static const struct netdev_class dpdk_ring_class =
>      NETDEV_DPDK_CLASS(
> @@ -3342,7 +3410,8 @@ static const struct netdev_class dpdk_ring_class =
>          netdev_dpdk_get_features,
>          netdev_dpdk_get_status,
>          netdev_dpdk_reconfigure,
> -        netdev_dpdk_rxq_recv);
> +        netdev_dpdk_rxq_recv,
> +        NULL);
> 
>  static const struct netdev_class dpdk_vhost_class =
>      NETDEV_DPDK_CLASS(
> @@ -3358,7 +3427,8 @@ static const struct netdev_class dpdk_vhost_class =
>          NULL,
>          NULL,
>          netdev_dpdk_vhost_reconfigure,
> -        netdev_dpdk_vhost_rxq_recv);
> +        netdev_dpdk_vhost_rxq_recv,
> +        NULL);
>  static const struct netdev_class dpdk_vhost_client_class =
>      NETDEV_DPDK_CLASS(
>          "dpdkvhostuserclient",
> @@ -3373,7 +3443,8 @@ static const struct netdev_class
> dpdk_vhost_client_class =
>          NULL,
>          NULL,
>          netdev_dpdk_vhost_client_reconfigure,
> -        netdev_dpdk_vhost_rxq_recv);
> +        netdev_dpdk_vhost_rxq_recv,
> +        NULL);
> 
>  void
>  netdev_dpdk_register(void)
> diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> index 0657434..4ef659e 100644
> --- a/lib/netdev-dummy.c
> +++ b/lib/netdev-dummy.c
> @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev
> *netdev_,
>      netdev_dummy_rxq_recv,                                      \
>      netdev_dummy_rxq_wait,                                      \
>      netdev_dummy_rxq_drain,                                     \
> +    NULL,                                                       \
>  }
> 
>  static const struct netdev_class dummy_class =
> diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> index 9ff1333..79478ee 100644
> --- a/lib/netdev-linux.c
> +++ b/lib/netdev-linux.c
> @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev
> *netdev_, enum netdev_flags off,
>      netdev_linux_rxq_recv,                                      \
>      netdev_linux_rxq_wait,                                      \
>      netdev_linux_rxq_drain,                                     \
> +    NULL,                                                       \
>  }
> 
>  const struct netdev_class netdev_linux_class =
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index 8346fc4..97e72c6 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -335,6 +335,11 @@ struct netdev_class {
>       * If the function returns a non-zero value, some of the packets might
> have
>       * been sent anyway.
>       *
> +     * Some netdev provider - like in case of 'dpdk' - may buffer the batch
> +     * of packets into an intermediate queue.  Buffered packets will be sent
> +     * out when their number will exceed a threshold or by the periodic call
> +     * to the drain function.
> +     *
>       * If 'may_steal' is false, the caller retains ownership of all the
>       * packets.  If 'may_steal' is true, the caller transfers ownership of all
>       * the packets to the network device, regardless of success.
> @@ -769,6 +774,9 @@ struct netdev_class {
> 
>      /* Discards all packets waiting to be received from 'rx'. */
>      int (*rxq_drain)(struct netdev_rxq *rx);
> +
> +    /* Drain all packets waiting to be sent on queue 'qid'. */
> +    int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
>  };
> 
>  int netdev_register_provider(const struct netdev_class *);
> diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
> index 2d0aa43..64cf617 100644
> --- a/lib/netdev-vport.c
> +++ b/lib/netdev-vport.c
> @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct
> netdev_stats *stats)
>      NULL,                   /* rx_dealloc */                \
>      NULL,                   /* rx_recv */                   \
>      NULL,                   /* rx_wait */                   \
> -    NULL,                   /* rx_drain */
> +    NULL,                   /* rx_drain */                  \
> +    NULL,                   /* tx_drain */
> 
> 
>  #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER,
> PUSH_HEADER, POP_HEADER)   \
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 1e6bb2b..5e0c53f 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
>              : 0);
>  }
> 
> +/* Flush packets on the queue 'qid'. */
> +int
> +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
> +{
> +    return (netdev->netdev_class->txq_drain
> +            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
> +            : EOPNOTSUPP);
> +}
> +
>  /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
>   * otherwise a positive errno value.
>   *
> diff --git a/lib/netdev.h b/lib/netdev.h
> index d6c07c1..7ddd790 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
>  int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
>                  bool may_steal, bool concurrent_txq);
>  void netdev_send_wait(struct netdev *, int qid);
> +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
> 
>  /* native tunnel APIs */
>  /* Structure to pass parameters required to build a tunnel header. */
> --
> 2.4.11
> 
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
diff mbox

Patch

diff --git a/lib/dp-packet.h b/lib/dp-packet.h
index 17b7026..9e3912a 100644
--- a/lib/dp-packet.h
+++ b/lib/dp-packet.h
@@ -39,7 +39,7 @@  enum OVS_PACKED_ENUM dp_packet_source {
     DPBUF_STACK,               /* Un-movable stack space or static buffer. */
     DPBUF_STUB,                /* Starts on stack, may expand into heap. */
     DPBUF_DPDK,                /* buffer data is from DPDK allocated memory.
-                                * ref to build_dp_packet() in netdev-dpdk. */
+                                * Ref dp_packet_init_dpdk() in dp-packet.c */
 };
 
 #define DP_PACKET_CONTEXT_SIZE 64
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 719a518..b0d47fa 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -289,6 +289,8 @@  struct dp_netdev_rxq {
     struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this queue. */
 };
 
+#define LAST_USED_QID_NONE -1
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -437,8 +439,14 @@  struct rxq_poll {
 struct tx_port {
     struct dp_netdev_port *port;
     int qid;
-    long long last_used;
+    long long last_used;        /* In case XPS is enabled, it contains the
+				 * timestamp of the last time the port was
+				 * used by the thread to send data.  After
+				 * XPS_TIMEOUT_MS elapses the qid will be
+				 * marked as -1. */
     struct hmap_node node;
+    int last_used_qid;          /* Last queue id where packets could be
+                                   enqueued. */
 };
 
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
@@ -2900,6 +2908,25 @@  cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
+dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *cached_tx_port;
+    int tx_qid;
+
+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
+        tx_qid = cached_tx_port->last_used_qid;
+
+        if (tx_qid != LAST_USED_QID_NONE) {
+            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
+                         cached_tx_port->port->dynamic_txqs);
+
+            /* Queue drained and mark it empty. */
+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
+        }
+    }
+}
+
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
@@ -3514,15 +3541,18 @@  pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
     return i;
 }
 
+enum { DRAIN_TSC = 20000ULL };
+
 static void *
 pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
-    unsigned int lc = 0;
+    unsigned int lc = 0, lc_drain = 0;
     struct polled_queue *poll_list;
     bool exiting;
     int poll_cnt;
     int i;
+    uint64_t prev = 0, now = 0;
 
     poll_list = NULL;
 
@@ -3555,6 +3585,17 @@  reload:
                                        poll_list[i].port_no);
         }
 
+#define MAX_LOOP_TO_DRAIN 128
+        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
+            lc_drain = 0;
+            prev = now;
+            now = pmd->last_cycles;
+
+            if ((now - prev) > DRAIN_TSC) {
+                dp_netdev_drain_txq_ports(pmd);
+            }
+        }
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -3573,6 +3614,9 @@  reload:
         }
     }
 
+    /* Drain the queues as part of reconfiguration */
+    dp_netdev_drain_txq_ports(pmd);
+
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     exiting = latch_is_set(&pmd->exit_latch);
     /* Signal here to make sure the pmd finishes
@@ -3890,6 +3934,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->last_used_qid = LAST_USED_QID_NONE;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4454,6 +4499,14 @@  dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 
     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 
+    /* The tx queue can change in XPS case, make sure packets in previous
+     * queue is drained properly. */
+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
+               tx->qid != tx->last_used_qid) {
+        netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs);
+        tx->last_used_qid = LAST_USED_QID_NONE;
+    }
+
     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
     return min_qid;
@@ -4548,6 +4601,13 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 tx_qid = pmd->static_tx_qid;
             }
 
+            /* In case these packets gets buffered into an intermediate
+             * queue and XPS is enabled the drain function could find a
+             * different Tx qid assigned to its thread.  We keep track
+             * of the qid we're now using, that will trigger the drain
+             * function and will select the right queue to flush. */
+            p->last_used_qid = tx_qid;
+
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                         dynamic_txqs);
             return;
@@ -4960,7 +5020,7 @@  dpif_dummy_register(enum dummy_level level)
                              "dp port new-number",
                              3, 3, dpif_dummy_change_port_number, NULL);
 }
-
+
 /* Datapath Classifier. */
 
 /* A set of rules that all have the same fields wildcarded. */
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index 94c515d..00d5263 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -1547,6 +1547,7 @@  netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
     netdev_bsd_rxq_recv,                             \
     netdev_bsd_rxq_wait,                             \
     netdev_bsd_rxq_drain,                            \
+    NULL,                                            \
 }
 
 const struct netdev_class netdev_bsd_class =
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 94568a1..3def755 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -166,7 +166,6 @@  static const struct rte_eth_conf port_conf = {
 
 enum { DPDK_RING_SIZE = 256 };
 BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
-enum { DRAIN_TSC = 200000ULL };
 
 enum dpdk_dev_type {
     DPDK_DEV_ETH = 0,
@@ -286,15 +285,26 @@  struct dpdk_mp {
     struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
 };
 
+/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
+ * Defaults to 'NETDEV_MAX_BURST'(32) now.
+ */
+#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
+
 /* There should be one 'struct dpdk_tx_queue' created for
  * each cpu core. */
 struct dpdk_tx_queue {
+    int count;                     /* Number of buffered packets waiting to
+                                      be sent. */
     rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
                                     * from concurrent access.  It is used only
                                     * if the queue is shared among different
                                     * pmd threads (see 'concurrent_txq'). */
     int map;                       /* Mapping of configured vhost-user queues
                                     * to enabled by guest. */
+    struct rte_mbuf *burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
+                                   /* Intermediate queues where packets can
+                                    * be buffered to amortize the cost of MMIO
+                                    * writes. */
 };
 
 /* dpdk has no way to remove dpdk ring ethernet devices
@@ -1381,6 +1391,7 @@  static inline int
 netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
                          struct rte_mbuf **pkts, int cnt)
 {
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
     uint32_t nb_tx = 0;
 
     while (nb_tx != cnt) {
@@ -1404,6 +1415,7 @@  netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
         }
     }
 
+    txq->count = 0;
     return cnt - nb_tx;
 }
 
@@ -1788,12 +1800,42 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     }
 }
 
+/* Enqueue packets in an intermediate queue and call the burst
+ * function when the queue is full.  This way we can amortize the
+ * cost of MMIO writes. */
+static inline int
+netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
+                            struct rte_mbuf **pkts, int cnt)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    int i = 0;
+    int dropped = 0;
+
+    while (i < cnt) {
+        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->count;
+        int tocopy = MIN(freeslots, cnt-i);
+
+        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
+               tocopy * sizeof (struct rte_mbuf *));
+
+        txq->count += tocopy;
+        i += tocopy;
+
+        /* Queue full, burst the packets */
+        if (txq->count >= INTERIM_QUEUE_BURST_THRESHOLD) {
+           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
+                   txq->count);
+        }
+    }
+    return dropped;
+}
+
 static int
 netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                        struct dp_packet_batch *batch,
                        bool may_steal, bool concurrent_txq OVS_UNUSED)
 {
-
     if (OVS_UNLIKELY(!may_steal || batch->packets[0]->source != DPBUF_DPDK)) {
         dpdk_do_tx_copy(netdev, qid, batch);
         dp_packet_delete_batch(batch, may_steal);
@@ -1836,7 +1878,7 @@  netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
         cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
         dropped = batch->count - cnt;
 
-        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
+        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
 
         if (OVS_UNLIKELY(dropped)) {
             rte_spinlock_lock(&dev->stats_lock);
@@ -1850,6 +1892,30 @@  netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
     }
 }
 
+/* Drain tx queues, this is called periodically to empty the
+ * intermediate queue in case of few packets (< INTERIM_QUEUE_BURST_THRESHOLD)
+ * are buffered into the queue. */
+static int
+netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool concurrent_txq)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    if (OVS_LIKELY(txq->count)) {
+        if (OVS_UNLIKELY(concurrent_txq)) {
+            qid = qid % dev->up.n_txq;
+            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
+        }
+
+        netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq->count);
+
+        if (OVS_UNLIKELY(concurrent_txq)) {
+            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
+        }
+    }
+    return 0;
+}
+
 static int
 netdev_dpdk_eth_send(struct netdev *netdev, int qid,
                      struct dp_packet_batch *batch, bool may_steal,
@@ -3243,7 +3309,7 @@  unlock:
                           SET_CONFIG, SET_TX_MULTIQ, SEND,    \
                           GET_CARRIER, GET_STATS,             \
                           GET_FEATURES, GET_STATUS,           \
-                          RECONFIGURE, RXQ_RECV)              \
+                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
 {                                                             \
     NAME,                                                     \
     true,                       /* is_pmd */                  \
@@ -3310,6 +3376,7 @@  unlock:
     RXQ_RECV,                                                 \
     NULL,                       /* rx_wait */                 \
     NULL,                       /* rxq_drain */               \
+    TXQ_DRAIN,      /* txq_drain */                           \
 }
 
 static const struct netdev_class dpdk_class =
@@ -3326,7 +3393,8 @@  static const struct netdev_class dpdk_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        netdev_dpdk_txq_drain);
 
 static const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
@@ -3342,7 +3410,8 @@  static const struct netdev_class dpdk_ring_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        NULL);
 
 static const struct netdev_class dpdk_vhost_class =
     NETDEV_DPDK_CLASS(
@@ -3358,7 +3427,8 @@  static const struct netdev_class dpdk_vhost_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        NULL);
 static const struct netdev_class dpdk_vhost_client_class =
     NETDEV_DPDK_CLASS(
         "dpdkvhostuserclient",
@@ -3373,7 +3443,8 @@  static const struct netdev_class dpdk_vhost_client_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_client_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        NULL);
 
 void
 netdev_dpdk_register(void)
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 0657434..4ef659e 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1409,6 +1409,7 @@  netdev_dummy_update_flags(struct netdev *netdev_,
     netdev_dummy_rxq_recv,                                      \
     netdev_dummy_rxq_wait,                                      \
     netdev_dummy_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 static const struct netdev_class dummy_class =
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 9ff1333..79478ee 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2830,6 +2830,7 @@  netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
     netdev_linux_rxq_recv,                                      \
     netdev_linux_rxq_wait,                                      \
     netdev_linux_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 const struct netdev_class netdev_linux_class =
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 8346fc4..97e72c6 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -335,6 +335,11 @@  struct netdev_class {
      * If the function returns a non-zero value, some of the packets might have
      * been sent anyway.
      *
+     * Some netdev provider - like in case of 'dpdk' - may buffer the batch
+     * of packets into an intermediate queue.  Buffered packets will be sent
+     * out when their number will exceed a threshold or by the periodic call
+     * to the drain function.
+     *
      * If 'may_steal' is false, the caller retains ownership of all the
      * packets.  If 'may_steal' is true, the caller transfers ownership of all
      * the packets to the network device, regardless of success.
@@ -769,6 +774,9 @@  struct netdev_class {
 
     /* Discards all packets waiting to be received from 'rx'. */
     int (*rxq_drain)(struct netdev_rxq *rx);
+
+    /* Drain all packets waiting to be sent on queue 'qid'. */
+    int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
 };
 
 int netdev_register_provider(const struct netdev_class *);
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 2d0aa43..64cf617 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -847,7 +847,8 @@  get_stats(const struct netdev *netdev, struct netdev_stats *stats)
     NULL,                   /* rx_dealloc */                \
     NULL,                   /* rx_recv */                   \
     NULL,                   /* rx_wait */                   \
-    NULL,                   /* rx_drain */
+    NULL,                   /* rx_drain */                  \
+    NULL,                   /* tx_drain */
 
 
 #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER)   \
diff --git a/lib/netdev.c b/lib/netdev.c
index 1e6bb2b..5e0c53f 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -670,6 +670,15 @@  netdev_rxq_drain(struct netdev_rxq *rx)
             : 0);
 }
 
+/* Flush packets on the queue 'qid'. */
+int
+netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
+{
+    return (netdev->netdev_class->txq_drain
+            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
+            : EOPNOTSUPP);
+}
+
 /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
  * otherwise a positive errno value.
  *
diff --git a/lib/netdev.h b/lib/netdev.h
index d6c07c1..7ddd790 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -155,6 +155,7 @@  int netdev_rxq_drain(struct netdev_rxq *);
 int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
                 bool may_steal, bool concurrent_txq);
 void netdev_send_wait(struct netdev *, int qid);
+int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
 
 /* native tunnel APIs */
 /* Structure to pass parameters required to build a tunnel header. */