diff mbox

[ovs-dev,RFC] netdev-dpdk: Add Tx intermediate queue for vhost ports.

Message ID 1492962104-89790-1-git-send-email-bhanuprakash.bodireddy@intel.com
State Changes Requested
Headers show

Commit Message

Bodireddy, Bhanuprakash April 23, 2017, 3:41 p.m. UTC
This commit adds the intermediate queue for vHost-user ports. It
improves the throughput in multiple virtual machines deployments and
also in cases with VM doing packet forwarding in kernel stack.

This patch is aligned with intermediate queue implementation for dpdk
ports that can be found here: https://patchwork.ozlabs.org/patch/723309/

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
---
- Limited testing is done with this RFC patch, test scenarios includes
  VM doing ip forwarding(Linux stack) and running testpmd in the guest.
- Sanity testing is done with multiple VMs to check for any locking/crashes.
- Much of the testing is done with single queue, and very basic testing with MQ.
- No throughput/latency tests are done at this point.

TODO:
- Retry logic in 'netdev_dpdk_vhost_tx_burst' should be handled appropriately to
  lessen the throughput impact when multiple vHost-user port serviced by same PMD.
  An option could be to allow configurable 'retries' option and the default being
  no retries. During testing it was found that the second retry couldn't add a single
  packet to RX queue most of the times with ip forwarding in kernel stack*.

 lib/dpif-netdev.c     |  51 +++++++++++++++++++++-
 lib/netdev-dpdk.c     | 117 ++++++++++++++++++++++++++++++++++++++------------
 lib/netdev-dummy.c    |   1 +
 lib/netdev-linux.c    |   1 +
 lib/netdev-provider.h |   3 ++
 lib/netdev-vport.c    |   3 +-
 lib/netdev.c          |   9 ++++
 lib/netdev.h          |   1 +
 8 files changed, 156 insertions(+), 30 deletions(-)

Comments

Eelco Chaudron May 9, 2017, 3:25 p.m. UTC | #1
On 23/04/17 17:41, Bhanuprakash Bodireddy wrote:

 > This commit adds the intermediate queue for vHost-user ports. It
 > improves the throughput in multiple virtual machines deployments and
 > also in cases with VM doing packet forwarding in kernel stack.
 >
 > This patch is aligned with intermediate queue implementation for dpdk
 > ports that can be found here: https://patchwork.ozlabs.org/patch/723309/

This patch and the one above combined will increase throughput in general
however to the cost of additional latency (see some numbers below).

However I still would like to see both patches applied with a flush every
tx batch. This still increase performance if the rx batch has overlapping
egress ports, but lacks the latency increase.

It would be nice if you could do your latency tests with this flush included
to see if you get the same results I got with this patch and the earlier 
one.

 > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
 > Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
 > Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
 > ---
 > - Limited testing is done with this RFC patch, test scenarios includes
 >   VM doing ip forwarding(Linux stack) and running testpmd in the guest.
 > - Sanity testing is done with multiple VMs to check for any 
locking/crashes.
 > - Much of the testing is done with single queue, and very basic 
testing with MQ.
 > - No throughput/latency tests are done at this point.

I did do some quick latency and throughput tests (with only this patch 
applied).
Same test setup as for the other patch set, i.e. two 82599ES 10G port 
with 64 byte
packets being send at wire speed:

Physical to Virtual test:

flows
Number      plain                patch +
of flows  git clone    patch      flush
========  =========  =========  =========
10          5945899    8006593    7833914
32          3872211    6596310    6530133
50          3283713    5861894    6618711
100         3132540    5953752    5857226
500         2964499    5612901    5273006
1000        2931952    5233089    5178038


Physical to Virtual to Physical test:

Number      plain                patch +
of flows  git clone    patch      flush
========  =========  =========  =========
10          3240647    2659526    3652217
32          2136872    2060313    2834941
50          1981795    1912476    2897763
100         1794678    1798084    2014881
500         1686756    1672014    1657513
1000        1677795    1628578    1612480

The results for the latency tests mimics your test case 2 form the previous
patch set, sending 10G traffic @ wire speed:

===== GIT CLONE
Pkt size  min(ns)  avg(ns)  max(ns)
  512      10,011   12,100   281,915
1024       7,870    9,313   193,116
1280       7,862    9,036   194,439
1518       8,215    9,417   204,782

===== PATCH
Pkt size  min(ns)  avg(ns)  max(ns)
  512      25,044   28,244   774,921
1024      29,029   33,031   218,653
1280      26,464   30,097   203,083
1518      25,870   29,412   204,165

===== PATCH + FLUSH
Pkt size  min(ns)  avg(ns)  max(ns)
  512      10,492   13,655   281,538
1024       8,407    9,784   205,095
1280       8,399    9,750   194,888
1518       8,367    9,722   196,973


 > TODO:
 > - Retry logic in 'netdev_dpdk_vhost_tx_burst' should be handled 
appropriately to
 >   lessen the throughput impact when multiple vHost-user port serviced 
by same PMD.
 >   An option could be to allow configurable 'retries' option and the 
default being
 >   no retries. During testing it was found that the second retry 
couldn't add a single
 >   packet to RX queue most of the times with ip forwarding in kernel 
stack*.

Taking the above into consideration see my review comment inline. Also 
I'll assume this
patch will be applied together with the other patch set.

 >
 >  lib/dpif-netdev.c     |  51 +++++++++++++++++++++-
 >  lib/netdev-dpdk.c     | 117 
++++++++++++++++++++++++++++++++++++++------------
 >  lib/netdev-dummy.c    |   1 +
 >  lib/netdev-linux.c    |   1 +
 >  lib/netdev-provider.h |   3 ++
 >  lib/netdev-vport.c    |   3 +-
 >  lib/netdev.c          |   9 ++++
 >  lib/netdev.h          |   1 +
 >  8 files changed, 156 insertions(+), 30 deletions(-)
 >
 > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
 > index a14a2eb..4710985 100644
 > --- a/lib/dpif-netdev.c
 > +++ b/lib/dpif-netdev.c
 > @@ -344,6 +344,8 @@ struct dp_netdev_rxq {
 >      struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll 
this queue. */
 >  };
 >
 > +#define LAST_USED_QID_NONE -1
 > +
 >  /* A port in a netdev-based datapath. */
 >  struct dp_netdev_port {
 >      odp_port_t port_no;
 > @@ -494,6 +496,8 @@ struct tx_port {
 >      int qid;
 >      long long last_used;
 >      struct hmap_node node;
 > +    int last_used_qid;    /* Last queue id where packets could be
 > +                             enqueued. */
 >  };
 >
 >  /* PMD: Poll modes drivers.  PMD accesses devices via polling to 
eliminate
 > @@ -3033,6 +3037,26 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 >  }
 >
 >  static void
 > +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
 > +{
 > +    struct tx_port *cached_tx_port;
 > +    int tx_qid;
 > +
 > +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
 > +        tx_qid = cached_tx_port->last_used_qid;
 > +
 > +        if (tx_qid != LAST_USED_QID_NONE) {
 > + netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
 > + cached_tx_port->port->dynamic_txqs);
 > +
 > +            /* Queue drained and mark it empty. */
 > +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
 > +        }
 > +    }
 > +}
 > +
 > +
 > +static void
 >  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 >                             struct netdev_rxq *rx,
 >                             odp_port_t port_no)
 > @@ -3647,15 +3671,18 @@ pmd_load_queues_and_ports(struct 
dp_netdev_pmd_thread *pmd,
 >      return i;
 >  }
 >
 > +enum { DRAIN_TSC = 20000ULL };
 > +
 >  static void *
 >  pmd_thread_main(void *f_)
 >  {
 >      struct dp_netdev_pmd_thread *pmd = f_;
 > -    unsigned int lc = 0;
 > +    unsigned int lc = 0, lc_drain = 0;
 >      struct polled_queue *poll_list;
 >      bool exiting;
 >      int poll_cnt;
 >      int i;
 > +    uint64_t prev = 0, now = 0;
 >
 >      poll_list = NULL;
 >
 > @@ -3688,6 +3715,17 @@ reload:
 > poll_list[i].port_no);
 >          }
 >
 > +#define MAX_LOOP_TO_DRAIN 128
 > +        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
 > +            lc_drain = 0;
 > +            prev = now;
 > +            now = pmd->last_cycles;
 > +
 > +            if ((now - prev) > DRAIN_TSC) {
 > +                dp_netdev_drain_txq_ports(pmd);
 > +            }
 > +        }
 > +
 >          if (lc++ > 1024) {
 >              bool reload;
 >
 > @@ -4330,6 +4368,7 @@ dp_netdev_add_port_tx_to_pmd(struct 
dp_netdev_pmd_thread *pmd,
 >
 >      tx->port = port;
 >      tx->qid = -1;
 > +    tx->last_used_qid = LAST_USED_QID_NONE;
 >

Looks this could lead to a queue not being drained. However this is not
happening if the other patch is applied. So this patch should only be 
applied together
with the other one!

 >      hmap_insert(&pmd->tx_ports, &tx->node, 
hash_port_no(tx->port->port_no));
 >      pmd->need_reload = true;
 > @@ -4892,6 +4931,14 @@ dpif_netdev_xps_get_tx_qid(const struct 
dp_netdev_pmd_thread *pmd,
 >
 >      dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 >
 > +    /* The tx queue can change in XPS case, make sure packets in 
previous
 > +     * queue is drained properly. */
 > +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
 > +               tx->qid != tx->last_used_qid) {
 > +        netdev_txq_drain(port->netdev, tx->last_used_qid, 
port->dynamic_txqs);
 > +        tx->last_used_qid = LAST_USED_QID_NONE;
 > +    }
 > +
 >      VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
 >               pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
 >      return min_qid;
 > @@ -4987,6 +5034,8 @@ dp_execute_cb(void *aux_, struct 
dp_packet_batch *packets_,
 >                  tx_qid = pmd->static_tx_qid;
 >              }
 >
 > +            p->last_used_qid = tx_qid;
 > +
 >              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
 >                          dynamic_txqs);
 >              return;
 > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
 > index ddc651b..26cfa85 100644
 > --- a/lib/netdev-dpdk.c
 > +++ b/lib/netdev-dpdk.c
 > @@ -286,6 +286,11 @@ struct dpdk_mp {
 >      struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
 >  };
 >
 > +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
 > + * Defaults to 'NETDEV_MAX_BURST'(32) now.
 > + */
 > +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
 > +
 >  /* There should be one 'struct dpdk_tx_queue' created for
 >   * each cpu core. */
 >  struct dpdk_tx_queue {
 > @@ -295,6 +300,11 @@ struct dpdk_tx_queue {
 >                                      * pmd threads (see 
'concurrent_txq'). */
 >      int map;                       /* Mapping of configured 
vhost-user queues
 >                                      * to enabled by guest. */
 > +    struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD];
 > +                                   /* Intermediate queue where 
packets can
 > +                                    * be buffered for vhost ports */
 > +    int pkt_cnt;                   /* Number of packets waiting to 
be sent on
 > +                                    * vhost port */
 >  };
 >
 >  /* dpdk has no way to remove dpdk ring ethernet devices
 > @@ -1666,6 +1676,61 @@ netdev_dpdk_vhost_update_tx_counters(struct 
netdev_stats *stats,
 >      }
 >  }
 >
 > +static int
 > +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid,
 > +                           int dropped)

We should skip dropped here, see below..

 > +{
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 > +    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts;
 > +
 > +    int tx_vid = netdev_dpdk_get_vid(dev);
 > +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
 > +    uint32_t sent=0;
 > +    uint32_t retries=0;

Guess there should be spaces around = for the 2 above?

 > +    uint32_t sum=0;
 > +    uint32_t total_pkts = 0;

No need to set sum and total_pkts to zero as it's set below

 > +
 > +    total_pkts = sum = txq->pkt_cnt;
 > +    do {
 > +        uint32_t ret;
 > +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, 
&cur_pkts[sent], sum);
 > +        if (!ret) {

Do we need an OVS_UNLIKELY() here?

 > +            /* No packets enqueued - do not retry. */
 > +            break;
 > +        } else {
 > +            /* Packet have been sent */
 > +            sent += ret;
 > +
 > +            /* 'sum; packet have to be retransmitted */
 > +            sum -= ret;
 > +        }
 > +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
 > +
 > +    for (int i=0; i < total_pkts - dropped; i++) {
 > +        dp_packet_delete(txq->pkts[i]);

Guess here we should delete all the packets. Not taking the dropped into 
account.
This has to do with the old function below, where qos would have dropped 
packets.
Here the total number of packets we need to free is total_pkts, i.e. the 
ones in
our intermediate queue we're flushing.

 > +    }
 > +
 > +    /* Reset pkt count */
 > +    txq->pkt_cnt = 0;
 > +
 > +    /* 'sum' refers to packets dropped */
 > +    return sum;
 > +}
 > +
 > +static int
 > +netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid,
 > +                            bool concurrent_txq OVS_UNUSED)
 > +{
 > +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 > +
 > +    if (OVS_LIKELY(txq->pkt_cnt)) {
 > +        netdev_dpdk_vhost_tx_burst(dev, qid, 0);
 > +    }
 > +
 > +    return 0;
 > +}
 > +

Why not keep the netdev_dpdk_txq_drain() function from your previous patch,
or maybe add a small comment why concurrent_txq's are not needed?


 >  static void
 >  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
 >                           struct dp_packet **pkts, int cnt)
 > @@ -1674,16 +1739,20 @@ __netdev_dpdk_vhost_send(struct netdev 
*netdev, int qid,
 >      struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts;
 >      unsigned int total_pkts = cnt;
 >      unsigned int dropped = 0;
 > -    int i, retries = 0;
 > +    int i;
 >
 >      qid = dev->tx_q[qid % netdev->n_txq].map;
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 >
 >      if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0
 >                       || !(dev->flags & NETDEV_UP))) {
 >          rte_spinlock_lock(&dev->stats_lock);
 >          dev->stats.tx_dropped+= cnt;
 >          rte_spinlock_unlock(&dev->stats_lock);
 > -        goto out;
 > +
 > +        for (i = 0; i < total_pkts; i++) {
 > +            dp_packet_delete(pkts[i]);
 > +        }

Should we not bail out here?

 >      }
 >
 >      rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
 > @@ -1693,34 +1762,21 @@ __netdev_dpdk_vhost_send(struct netdev 
*netdev, int qid,
 >      cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
 >      dropped = total_pkts - cnt;
 >
 > -    do {
 > -        int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
 > -        unsigned int tx_pkts;
 > -
 > -        tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev),
 > -                                          vhost_qid, cur_pkts, cnt);
 > -        if (OVS_LIKELY(tx_pkts)) {
 > -            /* Packets have been sent.*/
 > -            cnt -= tx_pkts;
 > -            /* Prepare for possible retry.*/
 > -            cur_pkts = &cur_pkts[tx_pkts];
 > -        } else {
 > -            /* No packets sent - do not retry.*/
 > -            break;
 > +    int idx = 0;
 > +    while (idx < cnt) {
 > +        txq->pkts[txq->pkt_cnt++] = pkts[idx++];
 > +
 > +        if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
 > +            dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped);
 >          }
 > -    } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM));
 > +    }
 >
 >      rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
 >
 >      rte_spinlock_lock(&dev->stats_lock);
 >      netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
 > -                                         cnt + dropped);
 > +                                         dropped);
 >      rte_spinlock_unlock(&dev->stats_lock);
 > -
 > -out:
 > -    for (i = 0; i < total_pkts - dropped; i++) {
 > -        dp_packet_delete(pkts[i]);
 > -    }
 >  }
 >
 >  /* Tx function. Transmit packets indefinitely */
 > @@ -3247,7 +3303,7 @@ unlock:
 >                            SET_CONFIG, SET_TX_MULTIQ, SEND,    \
 >                            GET_CARRIER, GET_STATS,             \
 >                            GET_FEATURES, GET_STATUS,           \
 > -                          RECONFIGURE, RXQ_RECV)              \
 > +                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
 > {                                                             \
 > NAME,                                                     \
 >      true,                       /* is_pmd */                  \
 > @@ -3314,6 +3370,7 @@ unlock:
 > RXQ_RECV,                                                 \
 >      NULL,                       /* rx_wait */                 \
 >      NULL,                       /* rxq_drain */               \
 > +    TXQ_DRAIN,                  /* txq_drain */               \
 >  }
 >
 >  static const struct netdev_class dpdk_class =
 > @@ -3330,7 +3387,8 @@ static const struct netdev_class dpdk_class =
 >          netdev_dpdk_get_features,
 >          netdev_dpdk_get_status,
 >          netdev_dpdk_reconfigure,
 > -        netdev_dpdk_rxq_recv);
 > +        netdev_dpdk_rxq_recv,
 > +        NULL);
 >
 >  static const struct netdev_class dpdk_ring_class =
 >      NETDEV_DPDK_CLASS(
 > @@ -3346,7 +3404,8 @@ static const struct netdev_class dpdk_ring_class =
 >          netdev_dpdk_get_features,
 >          netdev_dpdk_get_status,
 >          netdev_dpdk_reconfigure,
 > -        netdev_dpdk_rxq_recv);
 > +        netdev_dpdk_rxq_recv,
 > +        NULL);
 >
 >  static const struct netdev_class dpdk_vhost_class =
 >      NETDEV_DPDK_CLASS(
 > @@ -3362,7 +3421,8 @@ static const struct netdev_class dpdk_vhost_class =
 >          NULL,
 >          NULL,
 >          netdev_dpdk_vhost_reconfigure,
 > -        netdev_dpdk_vhost_rxq_recv);
 > +        netdev_dpdk_vhost_rxq_recv,
 > +        netdev_dpdk_vhost_txq_drain);
 >  static const struct netdev_class dpdk_vhost_client_class =
 >      NETDEV_DPDK_CLASS(
 >          "dpdkvhostuserclient",
 > @@ -3377,7 +3437,8 @@ static const struct netdev_class 
dpdk_vhost_client_class =
 >          NULL,
 >          NULL,
 >          netdev_dpdk_vhost_client_reconfigure,
 > -        netdev_dpdk_vhost_rxq_recv);
 > +        netdev_dpdk_vhost_rxq_recv,
 > +        netdev_dpdk_vhost_txq_drain);
 >
 >  void
 >  netdev_dpdk_register(void)
 > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
 > index 0657434..4ef659e 100644
 > --- a/lib/netdev-dummy.c
 > +++ b/lib/netdev-dummy.c
 > @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
 > netdev_dummy_rxq_recv,                                      \
 > netdev_dummy_rxq_wait,                                      \
 > netdev_dummy_rxq_drain,                                     \
 > + NULL,                                                       \
 >  }
 >
 >  static const struct netdev_class dummy_class =
 > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
 > index 9ff1333..79478ee 100644
 > --- a/lib/netdev-linux.c
 > +++ b/lib/netdev-linux.c
 > @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev 
*netdev_, enum netdev_flags off,
 > netdev_linux_rxq_recv,                                      \
 > netdev_linux_rxq_wait,                                      \
 > netdev_linux_rxq_drain,                                     \
 > + NULL,                                                       \
 >  }
 >
 >  const struct netdev_class netdev_linux_class =
 > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
 > index 8346fc4..5dd68db 100644
 > --- a/lib/netdev-provider.h
 > +++ b/lib/netdev-provider.h
 > @@ -769,6 +769,9 @@ struct netdev_class {
 >
 >      /* Discards all packets waiting to be received from 'rx'. */
 >      int (*rxq_drain)(struct netdev_rxq *rx);
 > +
 > +    /* Drain all packets waiting to be sent on queue 'qid'. */
 > +    int (*txq_drain)(struct netdev *netdev, int qid, bool 
concurrent_txq);
 >  };
 >
 >  int netdev_register_provider(const struct netdev_class *);
 > diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
 > index 39093e8..eb4b7d2 100644
 > --- a/lib/netdev-vport.c
 > +++ b/lib/netdev-vport.c
 > @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct 
netdev_stats *stats)
 >      NULL,                   /* rx_dealloc */ \
 >      NULL,                   /* rx_recv */ \
 >      NULL,                   /* rx_wait */ \
 > -    NULL,                   /* rx_drain */
 > +    NULL,                   /* rx_drain */ \
 > +    NULL,                   /* tx_drain */
 >
 >
 >  #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, 
POP_HEADER)   \
 > diff --git a/lib/netdev.c b/lib/netdev.c
 > index a8d8eda..b486b5d 100644
 > --- a/lib/netdev.c
 > +++ b/lib/netdev.c
 > @@ -678,6 +678,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
 >              : 0);
 >  }
 >
 > +/* Flush packets on the queue 'qid'. */
 > +int
 > +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
 > +{
 > +    return (netdev->netdev_class->txq_drain
 > +            ? netdev->netdev_class->txq_drain(netdev, qid, 
netdev_txq_drain)
 > +            : EOPNOTSUPP);
 > +}
 > +
 >  /* Configures the number of tx queues of 'netdev'. Returns 0 if 
successful,
 >   * otherwise a positive errno value.
 >   *
 > diff --git a/lib/netdev.h b/lib/netdev.h
 > index d6c07c1..7ddd790 100644
 > --- a/lib/netdev.h
 > +++ b/lib/netdev.h
 > @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
 >  int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
 >                  bool may_steal, bool concurrent_txq);
 >  void netdev_send_wait(struct netdev *, int qid);
 > +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
 >
 >  /* native tunnel APIs */
 >  /* Structure to pass parameters required to build a tunnel header. */
Bodireddy, Bhanuprakash May 15, 2017, 12:43 p.m. UTC | #2
Hi Eelco,

> > This commit adds the intermediate queue for vHost-user ports. It  >
>improves the throughput in multiple virtual machines deployments and  > also
>in cases with VM doing packet forwarding in kernel stack.
> >
> > This patch is aligned with intermediate queue implementation for dpdk  >
>ports that can be found here: https://patchwork.ozlabs.org/patch/723309/
>
>This patch and the one above combined will increase throughput in general
>however to the cost of additional latency (see some numbers below).
>
>However I still would like to see both patches applied with a flush every tx
>batch. This still increase performance if the rx batch has overlapping egress
>ports, but lacks the latency increase.
>
>It would be nice if you could do your latency tests with this flush included to
>see if you get the same results I got with this patch and the earlier one.

Thanks for reviewing this patch and for all the feedback.  I shall work on merging both the DPDK and vHost User intermediate port implementation and post it to the ML.  I am going to factor in all your comments. I have to go back and recheck few things w.r.t deleting the packets w.r.t 'total pkts' and 'dropped' counters.  

>I did do some quick latency and throughput tests (with only this patch
>applied).
>Same test setup as for the other patch set, i.e. two 82599ES 10G port with 64
>byte packets being send at wire speed:
>
>Physical to Virtual test:
>
>flows
>Number      plain                patch +
>of flows  git clone    patch      flush
>========  =========  =========  =========
>10          5945899    8006593    7833914
>32          3872211    6596310    6530133
>50          3283713    5861894    6618711
>100         3132540    5953752    5857226
>500         2964499    5612901    5273006
>1000        2931952    5233089    5178038
>
>
>Physical to Virtual to Physical test:
>
>Number      plain                patch +
>of flows  git clone    patch      flush
>========  =========  =========  =========
>10          3240647    2659526    3652217
>32          2136872    2060313    2834941
>50          1981795    1912476    2897763
>100         1794678    1798084    2014881
>500         1686756    1672014    1657513
>1000        1677795    1628578    1612480
>
>The results for the latency tests mimics your test case 2 form the previous
>patch set, sending 10G traffic @ wire speed:
>
>===== GIT CLONE
>Pkt size  min(ns)  avg(ns)  max(ns)
>  512      10,011   12,100   281,915
>1024       7,870    9,313   193,116
>1280       7,862    9,036   194,439
>1518       8,215    9,417   204,782
>
>===== PATCH
>Pkt size  min(ns)  avg(ns)  max(ns)
>  512      25,044   28,244   774,921
>1024      29,029   33,031   218,653
>1280      26,464   30,097   203,083
>1518      25,870   29,412   204,165
>
>===== PATCH + FLUSH
>Pkt size  min(ns)  avg(ns)  max(ns)
>  512      10,492   13,655   281,538
>1024       8,407    9,784   205,095
>1280       8,399    9,750   194,888
>1518       8,367    9,722   196,973
>

Many thanks and appreciate you for taking time to test the patch and posting the throughput and latency details. I will also do the tests once I merge the patches and cross check with the above nos. Also I am going to include the flush logic as suggested to improve latency.

- Bhanuprakash.
Ciara Loftus May 26, 2017, 2:50 p.m. UTC | #3
> 
> This commit adds the intermediate queue for vHost-user ports. It
> improves the throughput in multiple virtual machines deployments and
> also in cases with VM doing packet forwarding in kernel stack.
> 
> This patch is aligned with intermediate queue implementation for dpdk
> ports that can be found here: https://patchwork.ozlabs.org/patch/723309/

As mentioned in previous reviews it would be nice to see these two patches rebased and posted as a series & the combined performance figures.

> 
> Signed-off-by: Bhanuprakash Bodireddy
> <bhanuprakash.bodireddy@intel.com>
> Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
> Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
> ---
Bodireddy, Bhanuprakash June 7, 2017, 4:47 p.m. UTC | #4
Hi Eelco,

>On 23/04/17 17:41, Bhanuprakash Bodireddy wrote:
>
> > This commit adds the intermediate queue for vHost-user ports. It  >
>improves the throughput in multiple virtual machines deployments and  > also
>in cases with VM doing packet forwarding in kernel stack.
> >
> > This patch is aligned with intermediate queue implementation for dpdk  >
>ports that can be found here: https://patchwork.ozlabs.org/patch/723309/
>
>This patch and the one above combined will increase throughput in general
>however to the cost of additional latency (see some numbers below).
>
>However I still would like to see both patches applied with a flush every tx
>batch. This still increase performance if the rx batch has overlapping egress
>ports, but lacks the latency increase.

I have posted a new patch series by merging both DPDK and vHost User implementation of intermediate queue.
Here is the patch:  https://mail.openvswitch.org/pipermail/ovs-dev/2017-June/333515.html.
I have factored in and addressed all your comments on the earlier versions in the new series. It would be helpful if you can provide your comments on this.

Bhanuprakash.
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index a14a2eb..4710985 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -344,6 +344,8 @@  struct dp_netdev_rxq {
     struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this queue. */
 };
 
+#define LAST_USED_QID_NONE -1
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -494,6 +496,8 @@  struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    int last_used_qid;    /* Last queue id where packets could be
+                             enqueued. */
 };
 
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
@@ -3033,6 +3037,26 @@  cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
+dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *cached_tx_port;
+    int tx_qid;
+
+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
+        tx_qid = cached_tx_port->last_used_qid;
+
+        if (tx_qid != LAST_USED_QID_NONE) {
+            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
+                         cached_tx_port->port->dynamic_txqs);
+
+            /* Queue drained and mark it empty. */
+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
+        }
+    }
+}
+
+
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
@@ -3647,15 +3671,18 @@  pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
     return i;
 }
 
+enum { DRAIN_TSC = 20000ULL };
+
 static void *
 pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
-    unsigned int lc = 0;
+    unsigned int lc = 0, lc_drain = 0;
     struct polled_queue *poll_list;
     bool exiting;
     int poll_cnt;
     int i;
+    uint64_t prev = 0, now = 0;
 
     poll_list = NULL;
 
@@ -3688,6 +3715,17 @@  reload:
                                        poll_list[i].port_no);
         }
 
+#define MAX_LOOP_TO_DRAIN 128
+        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
+            lc_drain = 0;
+            prev = now;
+            now = pmd->last_cycles;
+
+            if ((now - prev) > DRAIN_TSC) {
+                dp_netdev_drain_txq_ports(pmd);
+            }
+        }
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4330,6 +4368,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->last_used_qid = LAST_USED_QID_NONE;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4892,6 +4931,14 @@  dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 
     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 
+    /* The tx queue can change in XPS case, make sure packets in previous
+     * queue is drained properly. */
+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
+               tx->qid != tx->last_used_qid) {
+        netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs);
+        tx->last_used_qid = LAST_USED_QID_NONE;
+    }
+
     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
     return min_qid;
@@ -4987,6 +5034,8 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 tx_qid = pmd->static_tx_qid;
             }
 
+            p->last_used_qid = tx_qid;
+
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                         dynamic_txqs);
             return;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index ddc651b..26cfa85 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -286,6 +286,11 @@  struct dpdk_mp {
     struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
 };
 
+/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
+ * Defaults to 'NETDEV_MAX_BURST'(32) now.
+ */
+#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
+
 /* There should be one 'struct dpdk_tx_queue' created for
  * each cpu core. */
 struct dpdk_tx_queue {
@@ -295,6 +300,11 @@  struct dpdk_tx_queue {
                                     * pmd threads (see 'concurrent_txq'). */
     int map;                       /* Mapping of configured vhost-user queues
                                     * to enabled by guest. */
+    struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD];
+                                   /* Intermediate queue where packets can
+                                    * be buffered for vhost ports */
+    int pkt_cnt;                   /* Number of packets waiting to be sent on
+                                    * vhost port */
 };
 
 /* dpdk has no way to remove dpdk ring ethernet devices
@@ -1666,6 +1676,61 @@  netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats,
     }
 }
 
+static int
+netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid,
+                           int dropped)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts;
+
+    int tx_vid = netdev_dpdk_get_vid(dev);
+    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+    uint32_t sent=0;
+    uint32_t retries=0;
+    uint32_t sum=0;
+    uint32_t total_pkts = 0;
+
+    total_pkts = sum = txq->pkt_cnt;
+    do {
+        uint32_t ret;
+        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
+        if (!ret) {
+            /* No packets enqueued - do not retry. */
+            break;
+        } else {
+            /* Packet have been sent */
+            sent += ret;
+
+            /* 'sum; packet have to be retransmitted */
+            sum -= ret;
+        }
+    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
+
+    for (int i=0; i < total_pkts - dropped; i++) {
+        dp_packet_delete(txq->pkts[i]);
+    }
+
+    /* Reset pkt count */
+    txq->pkt_cnt = 0;
+
+    /* 'sum' refers to packets dropped */
+    return sum;
+}
+
+static int
+netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid,
+                            bool concurrent_txq OVS_UNUSED)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    if (OVS_LIKELY(txq->pkt_cnt)) {
+        netdev_dpdk_vhost_tx_burst(dev, qid, 0);
+    }
+
+    return 0;
+}
+
 static void
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
@@ -1674,16 +1739,20 @@  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts;
     unsigned int total_pkts = cnt;
     unsigned int dropped = 0;
-    int i, retries = 0;
+    int i;
 
     qid = dev->tx_q[qid % netdev->n_txq].map;
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 
     if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0
                      || !(dev->flags & NETDEV_UP))) {
         rte_spinlock_lock(&dev->stats_lock);
         dev->stats.tx_dropped+= cnt;
         rte_spinlock_unlock(&dev->stats_lock);
-        goto out;
+
+        for (i = 0; i < total_pkts; i++) {
+            dp_packet_delete(pkts[i]);
+        }
     }
 
     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
@@ -1693,34 +1762,21 @@  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
     dropped = total_pkts - cnt;
 
-    do {
-        int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
-        unsigned int tx_pkts;
-
-        tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev),
-                                          vhost_qid, cur_pkts, cnt);
-        if (OVS_LIKELY(tx_pkts)) {
-            /* Packets have been sent.*/
-            cnt -= tx_pkts;
-            /* Prepare for possible retry.*/
-            cur_pkts = &cur_pkts[tx_pkts];
-        } else {
-            /* No packets sent - do not retry.*/
-            break;
+    int idx = 0;
+    while (idx < cnt) {
+        txq->pkts[txq->pkt_cnt++] = pkts[idx++];
+
+        if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
+            dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped);
         }
-    } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM));
+    }
 
     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
 
     rte_spinlock_lock(&dev->stats_lock);
     netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
-                                         cnt + dropped);
+                                         dropped);
     rte_spinlock_unlock(&dev->stats_lock);
-
-out:
-    for (i = 0; i < total_pkts - dropped; i++) {
-        dp_packet_delete(pkts[i]);
-    }
 }
 
 /* Tx function. Transmit packets indefinitely */
@@ -3247,7 +3303,7 @@  unlock:
                           SET_CONFIG, SET_TX_MULTIQ, SEND,    \
                           GET_CARRIER, GET_STATS,             \
                           GET_FEATURES, GET_STATUS,           \
-                          RECONFIGURE, RXQ_RECV)              \
+                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
 {                                                             \
     NAME,                                                     \
     true,                       /* is_pmd */                  \
@@ -3314,6 +3370,7 @@  unlock:
     RXQ_RECV,                                                 \
     NULL,                       /* rx_wait */                 \
     NULL,                       /* rxq_drain */               \
+    TXQ_DRAIN,                  /* txq_drain */               \
 }
 
 static const struct netdev_class dpdk_class =
@@ -3330,7 +3387,8 @@  static const struct netdev_class dpdk_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        NULL);
 
 static const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
@@ -3346,7 +3404,8 @@  static const struct netdev_class dpdk_ring_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        NULL);
 
 static const struct netdev_class dpdk_vhost_class =
     NETDEV_DPDK_CLASS(
@@ -3362,7 +3421,8 @@  static const struct netdev_class dpdk_vhost_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        netdev_dpdk_vhost_txq_drain);
 static const struct netdev_class dpdk_vhost_client_class =
     NETDEV_DPDK_CLASS(
         "dpdkvhostuserclient",
@@ -3377,7 +3437,8 @@  static const struct netdev_class dpdk_vhost_client_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_client_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        netdev_dpdk_vhost_txq_drain);
 
 void
 netdev_dpdk_register(void)
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 0657434..4ef659e 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1409,6 +1409,7 @@  netdev_dummy_update_flags(struct netdev *netdev_,
     netdev_dummy_rxq_recv,                                      \
     netdev_dummy_rxq_wait,                                      \
     netdev_dummy_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 static const struct netdev_class dummy_class =
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 9ff1333..79478ee 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2830,6 +2830,7 @@  netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
     netdev_linux_rxq_recv,                                      \
     netdev_linux_rxq_wait,                                      \
     netdev_linux_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 const struct netdev_class netdev_linux_class =
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 8346fc4..5dd68db 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -769,6 +769,9 @@  struct netdev_class {
 
     /* Discards all packets waiting to be received from 'rx'. */
     int (*rxq_drain)(struct netdev_rxq *rx);
+
+    /* Drain all packets waiting to be sent on queue 'qid'. */
+    int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
 };
 
 int netdev_register_provider(const struct netdev_class *);
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 39093e8..eb4b7d2 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -847,7 +847,8 @@  get_stats(const struct netdev *netdev, struct netdev_stats *stats)
     NULL,                   /* rx_dealloc */                \
     NULL,                   /* rx_recv */                   \
     NULL,                   /* rx_wait */                   \
-    NULL,                   /* rx_drain */
+    NULL,                   /* rx_drain */                  \
+    NULL,                   /* tx_drain */
 
 
 #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER)   \
diff --git a/lib/netdev.c b/lib/netdev.c
index a8d8eda..b486b5d 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -678,6 +678,15 @@  netdev_rxq_drain(struct netdev_rxq *rx)
             : 0);
 }
 
+/* Flush packets on the queue 'qid'. */
+int
+netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
+{
+    return (netdev->netdev_class->txq_drain
+            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
+            : EOPNOTSUPP);
+}
+
 /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
  * otherwise a positive errno value.
  *
diff --git a/lib/netdev.h b/lib/netdev.h
index d6c07c1..7ddd790 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -155,6 +155,7 @@  int netdev_rxq_drain(struct netdev_rxq *);
 int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
                 bool may_steal, bool concurrent_txq);
 void netdev_send_wait(struct netdev *, int qid);
+int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
 
 /* native tunnel APIs */
 /* Structure to pass parameters required to build a tunnel header. */