diff mbox

[ovs-dev,v3,4/6] netdev-dpdk: Add intermediate queue support.

Message ID 1498775976-4142-5-git-send-email-bhanuprakash.bodireddy@intel.com
State Not Applicable
Delegated to: Darrell Ball
Headers show

Commit Message

Bodireddy, Bhanuprakash June 29, 2017, 10:39 p.m. UTC
This commit introduces netdev_dpdk_eth_tx_queue() function that
implements intermediate queue and packet buffering. The packets get
buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is
reached and eventually gets transmitted.

To handle the case(eg: ping) where packets are sent at low rate and
can potentially get stuck in the queue, flush logic is implemented
that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet
processing loop.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>
Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
Acked-by: Eelco Chaudron <echaudro@redhat.com>
---
 lib/dpif-netdev.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.c | 37 +++++++++++++++++++++++++++++++++++--
 2 files changed, 78 insertions(+), 3 deletions(-)

Comments

Darrell Ball Aug. 7, 2017, 5:31 a.m. UTC | #1
-----Original Message-----
From: <ovs-dev-bounces@openvswitch.org> on behalf of Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>

Date: Thursday, June 29, 2017 at 3:39 PM
To: "dev@openvswitch.org" <dev@openvswitch.org>
Subject: [ovs-dev] [PATCH v3 4/6] netdev-dpdk: Add intermediate queue	support.

    This commit introduces netdev_dpdk_eth_tx_queue() function that
    implements intermediate queue and packet buffering. The packets get
    buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is
    reached and eventually gets transmitted.
    
    To handle the case(eg: ping) where packets are sent at low rate and
    can potentially get stuck in the queue, flush logic is implemented
    that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet
    processing loop.
    
    Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>

    Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>

    Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
    Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>

    Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>
    Acked-by: Eelco Chaudron <echaudro@redhat.com>

    ---
     lib/dpif-netdev.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
     lib/netdev-dpdk.c | 37 +++++++++++++++++++++++++++++++++++--
     2 files changed, 78 insertions(+), 3 deletions(-)
    
    diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
    index 4e29085..7e1f5bc 100644
    --- a/lib/dpif-netdev.c
    +++ b/lib/dpif-netdev.c
    @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type {
     };
     
     #define XPS_TIMEOUT_MS 500LL
    +#define LAST_USED_QID_NONE -1
     
     /* Contained by struct dp_netdev_port's 'rxqs' member.  */
     struct dp_netdev_rxq {
    @@ -492,7 +493,13 @@ struct rxq_poll {
     struct tx_port {
         struct dp_netdev_port *port;
         int qid;
    -    long long last_used;
    +    int last_used_qid;        /* Last queue id where packets got
    +                                 enqueued. */
    +    long long last_used;      /* In case XPS is enabled, it contains the
    +                               * timestamp of the last time the port was
    +                               * used by the thread to send data.  After
    +                               * XPS_TIMEOUT_MS elapses the qid will be
    +                               * marked as -1. */
         struct hmap_node node;
     };
     
    @@ -3080,6 +3087,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
     }
     
     static void
    +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd)
    +{
    +    struct tx_port *cached_tx_port;
    +    int tx_qid;
    +
    +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
    +        tx_qid = cached_tx_port->last_used_qid;
    +
    +        if (tx_qid != LAST_USED_QID_NONE) {
    +            netdev_txq_flush(cached_tx_port->port->netdev, tx_qid,
    +                         cached_tx_port->port->dynamic_txqs);
    +
    +            /* Queue flushed and mark it empty. */
    +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
    +        }
    +    }
    +}
    +

Could you move this function and I think the other code in dpif-netdev.c to patch 6, if you can ?
This function is unused, so will generate a build error with –Werror when applied in sequence and logically
this seems like it can go into patch 6.

Darrell


    +static void
     dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                                struct netdev_rxq *rx,
                                odp_port_t port_no)
    @@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     
         tx->port = port;
         tx->qid = -1;
    +    tx->last_used_qid = LAST_USED_QID_NONE;
     
         hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
         pmd->need_reload = true;
    @@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
     
         dpif_netdev_xps_revalidate_pmd(pmd, now, false);
     
    +    /* The tx queue can change in XPS case, make sure packets in previous
    +     * queue is flushed properly. */
    +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
    +               tx->qid != tx->last_used_qid) {
    +        netdev_txq_flush(port->netdev, tx->last_used_qid, port->dynamic_txqs);
    +        tx->last_used_qid = LAST_USED_QID_NONE;
    +    }
    +
         VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
                  pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
         return min_qid;
    @@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                     tx_qid = pmd->static_tx_qid;
                 }
     
    +            /* In case these packets gets buffered into an intermediate
    +             * queue and XPS is enabled the flush function could find a
    +             * different tx qid assigned to its thread.  We keep track
    +             * of the qid we're now using, that will trigger the flush
    +             * function and will select the right queue to flush. */
    +            p->last_used_qid = tx_qid;
    +
                 netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                             dynamic_txqs);
                 return;
    diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
    index 51d528d..99ad8c7 100644
    --- a/lib/netdev-dpdk.c
    +++ b/lib/netdev-dpdk.c
    @@ -1444,6 +1444,7 @@ static inline int
     netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
                              struct rte_mbuf **pkts, int cnt)
     {
    +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
         uint32_t nb_tx = 0;
     
         while (nb_tx != cnt) {
    @@ -1467,6 +1468,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
             }
         }
     
    +    txq->dpdk_pkt_cnt = 0;
         return cnt - nb_tx;
     }
     
    @@ -1866,6 +1868,37 @@ out:
         }
     }
     
    +/* Enqueue packets in an intermediate queue and call the flush
    + * function when the queue is full.  This way we can amortize the
    + * cost of MMIO writes. */
    +static inline int
    +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
    +                            struct rte_mbuf **pkts, int cnt)
    +{
    +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
    +
    +    int i = 0;
    +    int dropped = 0;
    +
    +    while (i < cnt) {
    +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->dpdk_pkt_cnt;
    +        int tocopy = MIN(freeslots, cnt-i);
    +
    +        memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i],
    +               tocopy * sizeof (struct rte_mbuf *));
    +
    +        txq->dpdk_pkt_cnt += tocopy;
    +        i += tocopy;
    +
    +        /* Queue full, burst the packets. */
    +        if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
    +            dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->dpdk_burst_pkts,
    +                                                txq->dpdk_pkt_cnt);
    +        }
    +    }
    +    return dropped;
    +}
    +
     /* Tx function. Transmit packets indefinitely */
     static void
     dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
    @@ -1923,7 +1956,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
             newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt);
     
             dropped += qos_pkts - newcnt;
    -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt);
    +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt);
         }
     
         if (OVS_UNLIKELY(dropped)) {
    @@ -1981,7 +2014,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
             cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
             dropped = batch->count - cnt;
     
    -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
    +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
     
             if (OVS_UNLIKELY(dropped)) {
                 rte_spinlock_lock(&dev->stats_lock);
    -- 
    2.4.11
    
    _______________________________________________
    dev mailing list
    dev@openvswitch.org
    https://urldefense.proofpoint.com/v2/url?u=https-3A__mail.openvswitch.org_mailman_listinfo_ovs-2Ddev&d=DwICAg&c=uilaK90D4TOVoH58JNXRgQ&r=BVhFA09CGX7JQ5Ih-uZnsw&m=KQ9FwtpeTsOPh2w-yxlJqvY1HiABokd4SQ_8Y8Yivjo&s=c38_T5x1uiPW0evVYSXnJecekG0acF-Vd5JhF4DuhN8&e=
Bodireddy, Bhanuprakash Aug. 8, 2017, 9:28 a.m. UTC | #2
>

>    This commit introduces netdev_dpdk_eth_tx_queue() function that

>    implements intermediate queue and packet buffering. The packets get

>    buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is

>    reached and eventually gets transmitted.

>

>    To handle the case(eg: ping) where packets are sent at low rate and

>    can potentially get stuck in the queue, flush logic is implemented

>    that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet

>    processing loop.

>

>    Signed-off-by: Bhanuprakash Bodireddy

><bhanuprakash.bodireddy@intel.com>

>    Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>

>    Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>

>    Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com>

>    Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com>

>    Acked-by: Eelco Chaudron <echaudro@redhat.com>

>    ---

>     lib/dpif-netdev.c | 44

>+++++++++++++++++++++++++++++++++++++++++++-

>     lib/netdev-dpdk.c | 37 +++++++++++++++++++++++++++++++++++--

>     2 files changed, 78 insertions(+), 3 deletions(-)

>

>    diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c

>    index 4e29085..7e1f5bc 100644

>    --- a/lib/dpif-netdev.c

>    +++ b/lib/dpif-netdev.c

>    @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type {

>     };

>

>     #define XPS_TIMEOUT_MS 500LL

>    +#define LAST_USED_QID_NONE -1

>

>     /* Contained by struct dp_netdev_port's 'rxqs' member.  */

>     struct dp_netdev_rxq {

>    @@ -492,7 +493,13 @@ struct rxq_poll {

>     struct tx_port {

>         struct dp_netdev_port *port;

>         int qid;

>    -    long long last_used;

>    +    int last_used_qid;        /* Last queue id where packets got

>    +                                 enqueued. */

>    +    long long last_used;      /* In case XPS is enabled, it contains the

>    +                               * timestamp of the last time the port was

>    +                               * used by the thread to send data.  After

>    +                               * XPS_TIMEOUT_MS elapses the qid will be

>    +                               * marked as -1. */

>         struct hmap_node node;

>     };

>

>    @@ -3080,6 +3087,25 @@ cycles_count_end(struct

>dp_netdev_pmd_thread *pmd,

>     }

>

>     static void

>    +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd)

>    +{

>    +    struct tx_port *cached_tx_port;

>    +    int tx_qid;

>    +

>    +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {

>    +        tx_qid = cached_tx_port->last_used_qid;

>    +

>    +        if (tx_qid != LAST_USED_QID_NONE) {

>    +            netdev_txq_flush(cached_tx_port->port->netdev, tx_qid,

>    +                         cached_tx_port->port->dynamic_txqs);

>    +

>    +            /* Queue flushed and mark it empty. */

>    +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;

>    +        }

>    +    }

>    +}

>    +

>

>Could you move this function and I think the other code in dpif-netdev.c to

>patch 6, if you can ?


Should be a simple change. Will do this.

>This function is unused, so will generate a build error with –Werror when

>applied in sequence and logically this seems like it can go into patch 6.


Completely agree. 

- Bhanuprakash.

>

>Darrell

>

>

>    +static void

>     dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,

>                                struct netdev_rxq *rx,

>                                odp_port_t port_no)

>    @@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct

>dp_netdev_pmd_thread *pmd,

>

>         tx->port = port;

>         tx->qid = -1;

>    +    tx->last_used_qid = LAST_USED_QID_NONE;

>

>         hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-

>>port_no));

>         pmd->need_reload = true;

>    @@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct

>dp_netdev_pmd_thread *pmd,

>

>         dpif_netdev_xps_revalidate_pmd(pmd, now, false);

>

>    +    /* The tx queue can change in XPS case, make sure packets in previous

>    +     * queue is flushed properly. */

>    +    if (tx->last_used_qid != LAST_USED_QID_NONE &&

>    +               tx->qid != tx->last_used_qid) {

>    +        netdev_txq_flush(port->netdev, tx->last_used_qid, port-

>>dynamic_txqs);

>    +        tx->last_used_qid = LAST_USED_QID_NONE;

>    +    }

>    +

>         VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",

>                  pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));

>         return min_qid;

>    @@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct

>dp_packet_batch *packets_,

>                     tx_qid = pmd->static_tx_qid;

>                 }

>

>    +            /* In case these packets gets buffered into an intermediate

>    +             * queue and XPS is enabled the flush function could find a

>    +             * different tx qid assigned to its thread.  We keep track

>    +             * of the qid we're now using, that will trigger the flush

>    +             * function and will select the right queue to flush. */

>    +            p->last_used_qid = tx_qid;

>    +

>                 netdev_send(p->port->netdev, tx_qid, packets_, may_steal,

>                             dynamic_txqs);

>                 return;

>    diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c

>    index 51d528d..99ad8c7 100644

>    --- a/lib/netdev-dpdk.c

>    +++ b/lib/netdev-dpdk.c

>    @@ -1444,6 +1444,7 @@ static inline int

>     netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,

>                              struct rte_mbuf **pkts, int cnt)

>     {

>    +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];

>         uint32_t nb_tx = 0;

>

>         while (nb_tx != cnt) {

>    @@ -1467,6 +1468,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk

>*dev, int qid,

>             }

>         }

>

>    +    txq->dpdk_pkt_cnt = 0;

>         return cnt - nb_tx;

>     }

>

>    @@ -1866,6 +1868,37 @@ out:

>         }

>     }

>

>    +/* Enqueue packets in an intermediate queue and call the flush

>    + * function when the queue is full.  This way we can amortize the

>    + * cost of MMIO writes. */

>    +static inline int

>    +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,

>    +                            struct rte_mbuf **pkts, int cnt)

>    +{

>    +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];

>    +

>    +    int i = 0;

>    +    int dropped = 0;

>    +

>    +    while (i < cnt) {

>    +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq-

>>dpdk_pkt_cnt;

>    +        int tocopy = MIN(freeslots, cnt-i);

>    +

>    +        memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i],

>    +               tocopy * sizeof (struct rte_mbuf *));

>    +

>    +        txq->dpdk_pkt_cnt += tocopy;

>    +        i += tocopy;

>    +

>    +        /* Queue full, burst the packets. */

>    +        if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {

>    +            dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq-

>>dpdk_burst_pkts,

>    +                                                txq->dpdk_pkt_cnt);

>    +        }

>    +    }

>    +    return dropped;

>    +}

>    +

>     /* Tx function. Transmit packets indefinitely */

>     static void

>     dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch

>*batch)

>    @@ -1923,7 +1956,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid,

>struct dp_packet_batch *batch)

>             newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt);

>

>             dropped += qos_pkts - newcnt;

>    -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt);

>    +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt);

>         }

>

>         if (OVS_UNLIKELY(dropped)) {

>    @@ -1981,7 +2014,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev,

>int qid,

>             cnt = netdev_dpdk_qos_run(dev, pkts, cnt);

>             dropped = batch->count - cnt;

>

>    -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);

>    +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);

>

>             if (OVS_UNLIKELY(dropped)) {

>                 rte_spinlock_lock(&dev->stats_lock);

>    --

>    2.4.11

>

>    _______________________________________________

>    dev mailing list

>    dev@openvswitch.org

>    https://urldefense.proofpoint.com/v2/url?u=https-

>3A__mail.openvswitch.org_mailman_listinfo_ovs-

>2Ddev&d=DwICAg&c=uilaK90D4TOVoH58JNXRgQ&r=BVhFA09CGX7JQ5Ih-

>uZnsw&m=KQ9FwtpeTsOPh2w-

>yxlJqvY1HiABokd4SQ_8Y8Yivjo&s=c38_T5x1uiPW0evVYSXnJecekG0acF-

>Vd5JhF4DuhN8&e=

>

>

>

>

>
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 4e29085..7e1f5bc 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -332,6 +332,7 @@  enum pmd_cycles_counter_type {
 };
 
 #define XPS_TIMEOUT_MS 500LL
+#define LAST_USED_QID_NONE -1
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
@@ -492,7 +493,13 @@  struct rxq_poll {
 struct tx_port {
     struct dp_netdev_port *port;
     int qid;
-    long long last_used;
+    int last_used_qid;        /* Last queue id where packets got
+                                 enqueued. */
+    long long last_used;      /* In case XPS is enabled, it contains the
+                               * timestamp of the last time the port was
+                               * used by the thread to send data.  After
+                               * XPS_TIMEOUT_MS elapses the qid will be
+                               * marked as -1. */
     struct hmap_node node;
 };
 
@@ -3080,6 +3087,25 @@  cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
+dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *cached_tx_port;
+    int tx_qid;
+
+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
+        tx_qid = cached_tx_port->last_used_qid;
+
+        if (tx_qid != LAST_USED_QID_NONE) {
+            netdev_txq_flush(cached_tx_port->port->netdev, tx_qid,
+                         cached_tx_port->port->dynamic_txqs);
+
+            /* Queue flushed and mark it empty. */
+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
+        }
+    }
+}
+
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
@@ -4355,6 +4381,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->last_used_qid = LAST_USED_QID_NONE;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4925,6 +4952,14 @@  dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 
     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 
+    /* The tx queue can change in XPS case, make sure packets in previous
+     * queue is flushed properly. */
+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
+               tx->qid != tx->last_used_qid) {
+        netdev_txq_flush(port->netdev, tx->last_used_qid, port->dynamic_txqs);
+        tx->last_used_qid = LAST_USED_QID_NONE;
+    }
+
     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
     return min_qid;
@@ -5020,6 +5055,13 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 tx_qid = pmd->static_tx_qid;
             }
 
+            /* In case these packets gets buffered into an intermediate
+             * queue and XPS is enabled the flush function could find a
+             * different tx qid assigned to its thread.  We keep track
+             * of the qid we're now using, that will trigger the flush
+             * function and will select the right queue to flush. */
+            p->last_used_qid = tx_qid;
+
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                         dynamic_txqs);
             return;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 51d528d..99ad8c7 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -1444,6 +1444,7 @@  static inline int
 netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
                          struct rte_mbuf **pkts, int cnt)
 {
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
     uint32_t nb_tx = 0;
 
     while (nb_tx != cnt) {
@@ -1467,6 +1468,7 @@  netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
         }
     }
 
+    txq->dpdk_pkt_cnt = 0;
     return cnt - nb_tx;
 }
 
@@ -1866,6 +1868,37 @@  out:
     }
 }
 
+/* Enqueue packets in an intermediate queue and call the flush
+ * function when the queue is full.  This way we can amortize the
+ * cost of MMIO writes. */
+static inline int
+netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
+                            struct rte_mbuf **pkts, int cnt)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    int i = 0;
+    int dropped = 0;
+
+    while (i < cnt) {
+        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->dpdk_pkt_cnt;
+        int tocopy = MIN(freeslots, cnt-i);
+
+        memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i],
+               tocopy * sizeof (struct rte_mbuf *));
+
+        txq->dpdk_pkt_cnt += tocopy;
+        i += tocopy;
+
+        /* Queue full, burst the packets. */
+        if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
+            dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->dpdk_burst_pkts,
+                                                txq->dpdk_pkt_cnt);
+        }
+    }
+    return dropped;
+}
+
 /* Tx function. Transmit packets indefinitely */
 static void
 dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
@@ -1923,7 +1956,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
         newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt);
 
         dropped += qos_pkts - newcnt;
-        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt);
+        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt);
     }
 
     if (OVS_UNLIKELY(dropped)) {
@@ -1981,7 +2014,7 @@  netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
         cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
         dropped = batch->count - cnt;
 
-        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
+        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
 
         if (OVS_UNLIKELY(dropped)) {
             rte_spinlock_lock(&dev->stats_lock);