Message ID | 1492962104-89790-1-git-send-email-bhanuprakash.bodireddy@intel.com |
---|---|
State | Changes Requested |
Headers | show |
On 23/04/17 17:41, Bhanuprakash Bodireddy wrote: > This commit adds the intermediate queue for vHost-user ports. It > improves the throughput in multiple virtual machines deployments and > also in cases with VM doing packet forwarding in kernel stack. > > This patch is aligned with intermediate queue implementation for dpdk > ports that can be found here: https://patchwork.ozlabs.org/patch/723309/ This patch and the one above combined will increase throughput in general however to the cost of additional latency (see some numbers below). However I still would like to see both patches applied with a flush every tx batch. This still increase performance if the rx batch has overlapping egress ports, but lacks the latency increase. It would be nice if you could do your latency tests with this flush included to see if you get the same results I got with this patch and the earlier one. > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com> > Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com> > Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com> > --- > - Limited testing is done with this RFC patch, test scenarios includes > VM doing ip forwarding(Linux stack) and running testpmd in the guest. > - Sanity testing is done with multiple VMs to check for any locking/crashes. > - Much of the testing is done with single queue, and very basic testing with MQ. > - No throughput/latency tests are done at this point. I did do some quick latency and throughput tests (with only this patch applied). Same test setup as for the other patch set, i.e. two 82599ES 10G port with 64 byte packets being send at wire speed: Physical to Virtual test: flows Number plain patch + of flows git clone patch flush ======== ========= ========= ========= 10 5945899 8006593 7833914 32 3872211 6596310 6530133 50 3283713 5861894 6618711 100 3132540 5953752 5857226 500 2964499 5612901 5273006 1000 2931952 5233089 5178038 Physical to Virtual to Physical test: Number plain patch + of flows git clone patch flush ======== ========= ========= ========= 10 3240647 2659526 3652217 32 2136872 2060313 2834941 50 1981795 1912476 2897763 100 1794678 1798084 2014881 500 1686756 1672014 1657513 1000 1677795 1628578 1612480 The results for the latency tests mimics your test case 2 form the previous patch set, sending 10G traffic @ wire speed: ===== GIT CLONE Pkt size min(ns) avg(ns) max(ns) 512 10,011 12,100 281,915 1024 7,870 9,313 193,116 1280 7,862 9,036 194,439 1518 8,215 9,417 204,782 ===== PATCH Pkt size min(ns) avg(ns) max(ns) 512 25,044 28,244 774,921 1024 29,029 33,031 218,653 1280 26,464 30,097 203,083 1518 25,870 29,412 204,165 ===== PATCH + FLUSH Pkt size min(ns) avg(ns) max(ns) 512 10,492 13,655 281,538 1024 8,407 9,784 205,095 1280 8,399 9,750 194,888 1518 8,367 9,722 196,973 > TODO: > - Retry logic in 'netdev_dpdk_vhost_tx_burst' should be handled appropriately to > lessen the throughput impact when multiple vHost-user port serviced by same PMD. > An option could be to allow configurable 'retries' option and the default being > no retries. During testing it was found that the second retry couldn't add a single > packet to RX queue most of the times with ip forwarding in kernel stack*. Taking the above into consideration see my review comment inline. Also I'll assume this patch will be applied together with the other patch set. > > lib/dpif-netdev.c | 51 +++++++++++++++++++++- > lib/netdev-dpdk.c | 117 ++++++++++++++++++++++++++++++++++++++------------ > lib/netdev-dummy.c | 1 + > lib/netdev-linux.c | 1 + > lib/netdev-provider.h | 3 ++ > lib/netdev-vport.c | 3 +- > lib/netdev.c | 9 ++++ > lib/netdev.h | 1 + > 8 files changed, 156 insertions(+), 30 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index a14a2eb..4710985 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -344,6 +344,8 @@ struct dp_netdev_rxq { > struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */ > }; > > +#define LAST_USED_QID_NONE -1 > + > /* A port in a netdev-based datapath. */ > struct dp_netdev_port { > odp_port_t port_no; > @@ -494,6 +496,8 @@ struct tx_port { > int qid; > long long last_used; > struct hmap_node node; > + int last_used_qid; /* Last queue id where packets could be > + enqueued. */ > }; > > /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate > @@ -3033,6 +3037,26 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd, > } > > static void > +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd) > +{ > + struct tx_port *cached_tx_port; > + int tx_qid; > + > + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { > + tx_qid = cached_tx_port->last_used_qid; > + > + if (tx_qid != LAST_USED_QID_NONE) { > + netdev_txq_drain(cached_tx_port->port->netdev, tx_qid, > + cached_tx_port->port->dynamic_txqs); > + > + /* Queue drained and mark it empty. */ > + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; > + } > + } > +} > + > + > +static void > dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, > struct netdev_rxq *rx, > odp_port_t port_no) > @@ -3647,15 +3671,18 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd, > return i; > } > > +enum { DRAIN_TSC = 20000ULL }; > + > static void * > pmd_thread_main(void *f_) > { > struct dp_netdev_pmd_thread *pmd = f_; > - unsigned int lc = 0; > + unsigned int lc = 0, lc_drain = 0; > struct polled_queue *poll_list; > bool exiting; > int poll_cnt; > int i; > + uint64_t prev = 0, now = 0; > > poll_list = NULL; > > @@ -3688,6 +3715,17 @@ reload: > poll_list[i].port_no); > } > > +#define MAX_LOOP_TO_DRAIN 128 > + if (lc_drain++ > MAX_LOOP_TO_DRAIN) { > + lc_drain = 0; > + prev = now; > + now = pmd->last_cycles; > + > + if ((now - prev) > DRAIN_TSC) { > + dp_netdev_drain_txq_ports(pmd); > + } > + } > + > if (lc++ > 1024) { > bool reload; > > @@ -4330,6 +4368,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, > > tx->port = port; > tx->qid = -1; > + tx->last_used_qid = LAST_USED_QID_NONE; > Looks this could lead to a queue not being drained. However this is not happening if the other patch is applied. So this patch should only be applied together with the other one! > hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); > pmd->need_reload = true; > @@ -4892,6 +4931,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, > > dpif_netdev_xps_revalidate_pmd(pmd, now, false); > > + /* The tx queue can change in XPS case, make sure packets in previous > + * queue is drained properly. */ > + if (tx->last_used_qid != LAST_USED_QID_NONE && > + tx->qid != tx->last_used_qid) { > + netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs); > + tx->last_used_qid = LAST_USED_QID_NONE; > + } > + > VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", > pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); > return min_qid; > @@ -4987,6 +5034,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, > tx_qid = pmd->static_tx_qid; > } > > + p->last_used_qid = tx_qid; > + > netdev_send(p->port->netdev, tx_qid, packets_, may_steal, > dynamic_txqs); > return; > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > index ddc651b..26cfa85 100644 > --- a/lib/netdev-dpdk.c > +++ b/lib/netdev-dpdk.c > @@ -286,6 +286,11 @@ struct dpdk_mp { > struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex); > }; > > +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting. > + * Defaults to 'NETDEV_MAX_BURST'(32) now. > + */ > +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST > + > /* There should be one 'struct dpdk_tx_queue' created for > * each cpu core. */ > struct dpdk_tx_queue { > @@ -295,6 +300,11 @@ struct dpdk_tx_queue { > * pmd threads (see 'concurrent_txq'). */ > int map; /* Mapping of configured vhost-user queues > * to enabled by guest. */ > + struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD]; > + /* Intermediate queue where packets can > + * be buffered for vhost ports */ > + int pkt_cnt; /* Number of packets waiting to be sent on > + * vhost port */ > }; > > /* dpdk has no way to remove dpdk ring ethernet devices > @@ -1666,6 +1676,61 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats, > } > } > > +static int > +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid, > + int dropped) We should skip dropped here, see below.. > +{ > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > + struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts; > + > + int tx_vid = netdev_dpdk_get_vid(dev); > + int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; > + uint32_t sent=0; > + uint32_t retries=0; Guess there should be spaces around = for the 2 above? > + uint32_t sum=0; > + uint32_t total_pkts = 0; No need to set sum and total_pkts to zero as it's set below > + > + total_pkts = sum = txq->pkt_cnt; > + do { > + uint32_t ret; > + ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum); > + if (!ret) { Do we need an OVS_UNLIKELY() here? > + /* No packets enqueued - do not retry. */ > + break; > + } else { > + /* Packet have been sent */ > + sent += ret; > + > + /* 'sum; packet have to be retransmitted */ > + sum -= ret; > + } > + } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM)); > + > + for (int i=0; i < total_pkts - dropped; i++) { > + dp_packet_delete(txq->pkts[i]); Guess here we should delete all the packets. Not taking the dropped into account. This has to do with the old function below, where qos would have dropped packets. Here the total number of packets we need to free is total_pkts, i.e. the ones in our intermediate queue we're flushing. > + } > + > + /* Reset pkt count */ > + txq->pkt_cnt = 0; > + > + /* 'sum' refers to packets dropped */ > + return sum; > +} > + > +static int > +netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid, > + bool concurrent_txq OVS_UNUSED) > +{ > + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > + > + if (OVS_LIKELY(txq->pkt_cnt)) { > + netdev_dpdk_vhost_tx_burst(dev, qid, 0); > + } > + > + return 0; > +} > + Why not keep the netdev_dpdk_txq_drain() function from your previous patch, or maybe add a small comment why concurrent_txq's are not needed? > static void > __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > struct dp_packet **pkts, int cnt) > @@ -1674,16 +1739,20 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts; > unsigned int total_pkts = cnt; > unsigned int dropped = 0; > - int i, retries = 0; > + int i; > > qid = dev->tx_q[qid % netdev->n_txq].map; > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > > if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0 > || !(dev->flags & NETDEV_UP))) { > rte_spinlock_lock(&dev->stats_lock); > dev->stats.tx_dropped+= cnt; > rte_spinlock_unlock(&dev->stats_lock); > - goto out; > + > + for (i = 0; i < total_pkts; i++) { > + dp_packet_delete(pkts[i]); > + } Should we not bail out here? > } > > rte_spinlock_lock(&dev->tx_q[qid].tx_lock); > @@ -1693,34 +1762,21 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt); > dropped = total_pkts - cnt; > > - do { > - int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; > - unsigned int tx_pkts; > - > - tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev), > - vhost_qid, cur_pkts, cnt); > - if (OVS_LIKELY(tx_pkts)) { > - /* Packets have been sent.*/ > - cnt -= tx_pkts; > - /* Prepare for possible retry.*/ > - cur_pkts = &cur_pkts[tx_pkts]; > - } else { > - /* No packets sent - do not retry.*/ > - break; > + int idx = 0; > + while (idx < cnt) { > + txq->pkts[txq->pkt_cnt++] = pkts[idx++]; > + > + if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { > + dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped); > } > - } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM)); > + } > > rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); > > rte_spinlock_lock(&dev->stats_lock); > netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts, > - cnt + dropped); > + dropped); > rte_spinlock_unlock(&dev->stats_lock); > - > -out: > - for (i = 0; i < total_pkts - dropped; i++) { > - dp_packet_delete(pkts[i]); > - } > } > > /* Tx function. Transmit packets indefinitely */ > @@ -3247,7 +3303,7 @@ unlock: > SET_CONFIG, SET_TX_MULTIQ, SEND, \ > GET_CARRIER, GET_STATS, \ > GET_FEATURES, GET_STATUS, \ > - RECONFIGURE, RXQ_RECV) \ > + RECONFIGURE, RXQ_RECV, TXQ_DRAIN) \ > { \ > NAME, \ > true, /* is_pmd */ \ > @@ -3314,6 +3370,7 @@ unlock: > RXQ_RECV, \ > NULL, /* rx_wait */ \ > NULL, /* rxq_drain */ \ > + TXQ_DRAIN, /* txq_drain */ \ > } > > static const struct netdev_class dpdk_class = > @@ -3330,7 +3387,8 @@ static const struct netdev_class dpdk_class = > netdev_dpdk_get_features, > netdev_dpdk_get_status, > netdev_dpdk_reconfigure, > - netdev_dpdk_rxq_recv); > + netdev_dpdk_rxq_recv, > + NULL); > > static const struct netdev_class dpdk_ring_class = > NETDEV_DPDK_CLASS( > @@ -3346,7 +3404,8 @@ static const struct netdev_class dpdk_ring_class = > netdev_dpdk_get_features, > netdev_dpdk_get_status, > netdev_dpdk_reconfigure, > - netdev_dpdk_rxq_recv); > + netdev_dpdk_rxq_recv, > + NULL); > > static const struct netdev_class dpdk_vhost_class = > NETDEV_DPDK_CLASS( > @@ -3362,7 +3421,8 @@ static const struct netdev_class dpdk_vhost_class = > NULL, > NULL, > netdev_dpdk_vhost_reconfigure, > - netdev_dpdk_vhost_rxq_recv); > + netdev_dpdk_vhost_rxq_recv, > + netdev_dpdk_vhost_txq_drain); > static const struct netdev_class dpdk_vhost_client_class = > NETDEV_DPDK_CLASS( > "dpdkvhostuserclient", > @@ -3377,7 +3437,8 @@ static const struct netdev_class dpdk_vhost_client_class = > NULL, > NULL, > netdev_dpdk_vhost_client_reconfigure, > - netdev_dpdk_vhost_rxq_recv); > + netdev_dpdk_vhost_rxq_recv, > + netdev_dpdk_vhost_txq_drain); > > void > netdev_dpdk_register(void) > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c > index 0657434..4ef659e 100644 > --- a/lib/netdev-dummy.c > +++ b/lib/netdev-dummy.c > @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_, > netdev_dummy_rxq_recv, \ > netdev_dummy_rxq_wait, \ > netdev_dummy_rxq_drain, \ > + NULL, \ > } > > static const struct netdev_class dummy_class = > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c > index 9ff1333..79478ee 100644 > --- a/lib/netdev-linux.c > +++ b/lib/netdev-linux.c > @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, > netdev_linux_rxq_recv, \ > netdev_linux_rxq_wait, \ > netdev_linux_rxq_drain, \ > + NULL, \ > } > > const struct netdev_class netdev_linux_class = > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h > index 8346fc4..5dd68db 100644 > --- a/lib/netdev-provider.h > +++ b/lib/netdev-provider.h > @@ -769,6 +769,9 @@ struct netdev_class { > > /* Discards all packets waiting to be received from 'rx'. */ > int (*rxq_drain)(struct netdev_rxq *rx); > + > + /* Drain all packets waiting to be sent on queue 'qid'. */ > + int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq); > }; > > int netdev_register_provider(const struct netdev_class *); > diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c > index 39093e8..eb4b7d2 100644 > --- a/lib/netdev-vport.c > +++ b/lib/netdev-vport.c > @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats) > NULL, /* rx_dealloc */ \ > NULL, /* rx_recv */ \ > NULL, /* rx_wait */ \ > - NULL, /* rx_drain */ > + NULL, /* rx_drain */ \ > + NULL, /* tx_drain */ > > > #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER) \ > diff --git a/lib/netdev.c b/lib/netdev.c > index a8d8eda..b486b5d 100644 > --- a/lib/netdev.c > +++ b/lib/netdev.c > @@ -678,6 +678,15 @@ netdev_rxq_drain(struct netdev_rxq *rx) > : 0); > } > > +/* Flush packets on the queue 'qid'. */ > +int > +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain) > +{ > + return (netdev->netdev_class->txq_drain > + ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain) > + : EOPNOTSUPP); > +} > + > /* Configures the number of tx queues of 'netdev'. Returns 0 if successful, > * otherwise a positive errno value. > * > diff --git a/lib/netdev.h b/lib/netdev.h > index d6c07c1..7ddd790 100644 > --- a/lib/netdev.h > +++ b/lib/netdev.h > @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *); > int netdev_send(struct netdev *, int qid, struct dp_packet_batch *, > bool may_steal, bool concurrent_txq); > void netdev_send_wait(struct netdev *, int qid); > +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq); > > /* native tunnel APIs */ > /* Structure to pass parameters required to build a tunnel header. */
Hi Eelco, > > This commit adds the intermediate queue for vHost-user ports. It > >improves the throughput in multiple virtual machines deployments and > also >in cases with VM doing packet forwarding in kernel stack. > > > > This patch is aligned with intermediate queue implementation for dpdk > >ports that can be found here: https://patchwork.ozlabs.org/patch/723309/ > >This patch and the one above combined will increase throughput in general >however to the cost of additional latency (see some numbers below). > >However I still would like to see both patches applied with a flush every tx >batch. This still increase performance if the rx batch has overlapping egress >ports, but lacks the latency increase. > >It would be nice if you could do your latency tests with this flush included to >see if you get the same results I got with this patch and the earlier one. Thanks for reviewing this patch and for all the feedback. I shall work on merging both the DPDK and vHost User intermediate port implementation and post it to the ML. I am going to factor in all your comments. I have to go back and recheck few things w.r.t deleting the packets w.r.t 'total pkts' and 'dropped' counters. >I did do some quick latency and throughput tests (with only this patch >applied). >Same test setup as for the other patch set, i.e. two 82599ES 10G port with 64 >byte packets being send at wire speed: > >Physical to Virtual test: > >flows >Number plain patch + >of flows git clone patch flush >======== ========= ========= ========= >10 5945899 8006593 7833914 >32 3872211 6596310 6530133 >50 3283713 5861894 6618711 >100 3132540 5953752 5857226 >500 2964499 5612901 5273006 >1000 2931952 5233089 5178038 > > >Physical to Virtual to Physical test: > >Number plain patch + >of flows git clone patch flush >======== ========= ========= ========= >10 3240647 2659526 3652217 >32 2136872 2060313 2834941 >50 1981795 1912476 2897763 >100 1794678 1798084 2014881 >500 1686756 1672014 1657513 >1000 1677795 1628578 1612480 > >The results for the latency tests mimics your test case 2 form the previous >patch set, sending 10G traffic @ wire speed: > >===== GIT CLONE >Pkt size min(ns) avg(ns) max(ns) > 512 10,011 12,100 281,915 >1024 7,870 9,313 193,116 >1280 7,862 9,036 194,439 >1518 8,215 9,417 204,782 > >===== PATCH >Pkt size min(ns) avg(ns) max(ns) > 512 25,044 28,244 774,921 >1024 29,029 33,031 218,653 >1280 26,464 30,097 203,083 >1518 25,870 29,412 204,165 > >===== PATCH + FLUSH >Pkt size min(ns) avg(ns) max(ns) > 512 10,492 13,655 281,538 >1024 8,407 9,784 205,095 >1280 8,399 9,750 194,888 >1518 8,367 9,722 196,973 > Many thanks and appreciate you for taking time to test the patch and posting the throughput and latency details. I will also do the tests once I merge the patches and cross check with the above nos. Also I am going to include the flush logic as suggested to improve latency. - Bhanuprakash.
> > This commit adds the intermediate queue for vHost-user ports. It > improves the throughput in multiple virtual machines deployments and > also in cases with VM doing packet forwarding in kernel stack. > > This patch is aligned with intermediate queue implementation for dpdk > ports that can be found here: https://patchwork.ozlabs.org/patch/723309/ As mentioned in previous reviews it would be nice to see these two patches rebased and posted as a series & the combined performance figures. > > Signed-off-by: Bhanuprakash Bodireddy > <bhanuprakash.bodireddy@intel.com> > Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com> > Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com> > ---
Hi Eelco, >On 23/04/17 17:41, Bhanuprakash Bodireddy wrote: > > > This commit adds the intermediate queue for vHost-user ports. It > >improves the throughput in multiple virtual machines deployments and > also >in cases with VM doing packet forwarding in kernel stack. > > > > This patch is aligned with intermediate queue implementation for dpdk > >ports that can be found here: https://patchwork.ozlabs.org/patch/723309/ > >This patch and the one above combined will increase throughput in general >however to the cost of additional latency (see some numbers below). > >However I still would like to see both patches applied with a flush every tx >batch. This still increase performance if the rx batch has overlapping egress >ports, but lacks the latency increase. I have posted a new patch series by merging both DPDK and vHost User implementation of intermediate queue. Here is the patch: https://mail.openvswitch.org/pipermail/ovs-dev/2017-June/333515.html. I have factored in and addressed all your comments on the earlier versions in the new series. It would be helpful if you can provide your comments on this. Bhanuprakash.
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index a14a2eb..4710985 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -344,6 +344,8 @@ struct dp_netdev_rxq { struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */ }; +#define LAST_USED_QID_NONE -1 + /* A port in a netdev-based datapath. */ struct dp_netdev_port { odp_port_t port_no; @@ -494,6 +496,8 @@ struct tx_port { int qid; long long last_used; struct hmap_node node; + int last_used_qid; /* Last queue id where packets could be + enqueued. */ }; /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate @@ -3033,6 +3037,26 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd, } static void +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd) +{ + struct tx_port *cached_tx_port; + int tx_qid; + + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { + tx_qid = cached_tx_port->last_used_qid; + + if (tx_qid != LAST_USED_QID_NONE) { + netdev_txq_drain(cached_tx_port->port->netdev, tx_qid, + cached_tx_port->port->dynamic_txqs); + + /* Queue drained and mark it empty. */ + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; + } + } +} + + +static void dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, struct netdev_rxq *rx, odp_port_t port_no) @@ -3647,15 +3671,18 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd, return i; } +enum { DRAIN_TSC = 20000ULL }; + static void * pmd_thread_main(void *f_) { struct dp_netdev_pmd_thread *pmd = f_; - unsigned int lc = 0; + unsigned int lc = 0, lc_drain = 0; struct polled_queue *poll_list; bool exiting; int poll_cnt; int i; + uint64_t prev = 0, now = 0; poll_list = NULL; @@ -3688,6 +3715,17 @@ reload: poll_list[i].port_no); } +#define MAX_LOOP_TO_DRAIN 128 + if (lc_drain++ > MAX_LOOP_TO_DRAIN) { + lc_drain = 0; + prev = now; + now = pmd->last_cycles; + + if ((now - prev) > DRAIN_TSC) { + dp_netdev_drain_txq_ports(pmd); + } + } + if (lc++ > 1024) { bool reload; @@ -4330,6 +4368,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->port = port; tx->qid = -1; + tx->last_used_qid = LAST_USED_QID_NONE; hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); pmd->need_reload = true; @@ -4892,6 +4931,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, dpif_netdev_xps_revalidate_pmd(pmd, now, false); + /* The tx queue can change in XPS case, make sure packets in previous + * queue is drained properly. */ + if (tx->last_used_qid != LAST_USED_QID_NONE && + tx->qid != tx->last_used_qid) { + netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs); + tx->last_used_qid = LAST_USED_QID_NONE; + } + VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); return min_qid; @@ -4987,6 +5034,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, tx_qid = pmd->static_tx_qid; } + p->last_used_qid = tx_qid; + netdev_send(p->port->netdev, tx_qid, packets_, may_steal, dynamic_txqs); return; diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index ddc651b..26cfa85 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -286,6 +286,11 @@ struct dpdk_mp { struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex); }; +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting. + * Defaults to 'NETDEV_MAX_BURST'(32) now. + */ +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST + /* There should be one 'struct dpdk_tx_queue' created for * each cpu core. */ struct dpdk_tx_queue { @@ -295,6 +300,11 @@ struct dpdk_tx_queue { * pmd threads (see 'concurrent_txq'). */ int map; /* Mapping of configured vhost-user queues * to enabled by guest. */ + struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD]; + /* Intermediate queue where packets can + * be buffered for vhost ports */ + int pkt_cnt; /* Number of packets waiting to be sent on + * vhost port */ }; /* dpdk has no way to remove dpdk ring ethernet devices @@ -1666,6 +1676,61 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats, } } +static int +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid, + int dropped) +{ + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts; + + int tx_vid = netdev_dpdk_get_vid(dev); + int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; + uint32_t sent=0; + uint32_t retries=0; + uint32_t sum=0; + uint32_t total_pkts = 0; + + total_pkts = sum = txq->pkt_cnt; + do { + uint32_t ret; + ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum); + if (!ret) { + /* No packets enqueued - do not retry. */ + break; + } else { + /* Packet have been sent */ + sent += ret; + + /* 'sum; packet have to be retransmitted */ + sum -= ret; + } + } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM)); + + for (int i=0; i < total_pkts - dropped; i++) { + dp_packet_delete(txq->pkts[i]); + } + + /* Reset pkt count */ + txq->pkt_cnt = 0; + + /* 'sum' refers to packets dropped */ + return sum; +} + +static int +netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid, + bool concurrent_txq OVS_UNUSED) +{ + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + + if (OVS_LIKELY(txq->pkt_cnt)) { + netdev_dpdk_vhost_tx_burst(dev, qid, 0); + } + + return 0; +} + static void __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, struct dp_packet **pkts, int cnt) @@ -1674,16 +1739,20 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts; unsigned int total_pkts = cnt; unsigned int dropped = 0; - int i, retries = 0; + int i; qid = dev->tx_q[qid % netdev->n_txq].map; + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0 || !(dev->flags & NETDEV_UP))) { rte_spinlock_lock(&dev->stats_lock); dev->stats.tx_dropped+= cnt; rte_spinlock_unlock(&dev->stats_lock); - goto out; + + for (i = 0; i < total_pkts; i++) { + dp_packet_delete(pkts[i]); + } } rte_spinlock_lock(&dev->tx_q[qid].tx_lock); @@ -1693,34 +1762,21 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt); dropped = total_pkts - cnt; - do { - int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; - unsigned int tx_pkts; - - tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev), - vhost_qid, cur_pkts, cnt); - if (OVS_LIKELY(tx_pkts)) { - /* Packets have been sent.*/ - cnt -= tx_pkts; - /* Prepare for possible retry.*/ - cur_pkts = &cur_pkts[tx_pkts]; - } else { - /* No packets sent - do not retry.*/ - break; + int idx = 0; + while (idx < cnt) { + txq->pkts[txq->pkt_cnt++] = pkts[idx++]; + + if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { + dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped); } - } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM)); + } rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); rte_spinlock_lock(&dev->stats_lock); netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts, - cnt + dropped); + dropped); rte_spinlock_unlock(&dev->stats_lock); - -out: - for (i = 0; i < total_pkts - dropped; i++) { - dp_packet_delete(pkts[i]); - } } /* Tx function. Transmit packets indefinitely */ @@ -3247,7 +3303,7 @@ unlock: SET_CONFIG, SET_TX_MULTIQ, SEND, \ GET_CARRIER, GET_STATS, \ GET_FEATURES, GET_STATUS, \ - RECONFIGURE, RXQ_RECV) \ + RECONFIGURE, RXQ_RECV, TXQ_DRAIN) \ { \ NAME, \ true, /* is_pmd */ \ @@ -3314,6 +3370,7 @@ unlock: RXQ_RECV, \ NULL, /* rx_wait */ \ NULL, /* rxq_drain */ \ + TXQ_DRAIN, /* txq_drain */ \ } static const struct netdev_class dpdk_class = @@ -3330,7 +3387,8 @@ static const struct netdev_class dpdk_class = netdev_dpdk_get_features, netdev_dpdk_get_status, netdev_dpdk_reconfigure, - netdev_dpdk_rxq_recv); + netdev_dpdk_rxq_recv, + NULL); static const struct netdev_class dpdk_ring_class = NETDEV_DPDK_CLASS( @@ -3346,7 +3404,8 @@ static const struct netdev_class dpdk_ring_class = netdev_dpdk_get_features, netdev_dpdk_get_status, netdev_dpdk_reconfigure, - netdev_dpdk_rxq_recv); + netdev_dpdk_rxq_recv, + NULL); static const struct netdev_class dpdk_vhost_class = NETDEV_DPDK_CLASS( @@ -3362,7 +3421,8 @@ static const struct netdev_class dpdk_vhost_class = NULL, NULL, netdev_dpdk_vhost_reconfigure, - netdev_dpdk_vhost_rxq_recv); + netdev_dpdk_vhost_rxq_recv, + netdev_dpdk_vhost_txq_drain); static const struct netdev_class dpdk_vhost_client_class = NETDEV_DPDK_CLASS( "dpdkvhostuserclient", @@ -3377,7 +3437,8 @@ static const struct netdev_class dpdk_vhost_client_class = NULL, NULL, netdev_dpdk_vhost_client_reconfigure, - netdev_dpdk_vhost_rxq_recv); + netdev_dpdk_vhost_rxq_recv, + netdev_dpdk_vhost_txq_drain); void netdev_dpdk_register(void) diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index 0657434..4ef659e 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_, netdev_dummy_rxq_recv, \ netdev_dummy_rxq_wait, \ netdev_dummy_rxq_drain, \ + NULL, \ } static const struct netdev_class dummy_class = diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index 9ff1333..79478ee 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_linux_rxq_recv, \ netdev_linux_rxq_wait, \ netdev_linux_rxq_drain, \ + NULL, \ } const struct netdev_class netdev_linux_class = diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 8346fc4..5dd68db 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -769,6 +769,9 @@ struct netdev_class { /* Discards all packets waiting to be received from 'rx'. */ int (*rxq_drain)(struct netdev_rxq *rx); + + /* Drain all packets waiting to be sent on queue 'qid'. */ + int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq); }; int netdev_register_provider(const struct netdev_class *); diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c index 39093e8..eb4b7d2 100644 --- a/lib/netdev-vport.c +++ b/lib/netdev-vport.c @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats) NULL, /* rx_dealloc */ \ NULL, /* rx_recv */ \ NULL, /* rx_wait */ \ - NULL, /* rx_drain */ + NULL, /* rx_drain */ \ + NULL, /* tx_drain */ #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER) \ diff --git a/lib/netdev.c b/lib/netdev.c index a8d8eda..b486b5d 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -678,6 +678,15 @@ netdev_rxq_drain(struct netdev_rxq *rx) : 0); } +/* Flush packets on the queue 'qid'. */ +int +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain) +{ + return (netdev->netdev_class->txq_drain + ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain) + : EOPNOTSUPP); +} + /* Configures the number of tx queues of 'netdev'. Returns 0 if successful, * otherwise a positive errno value. * diff --git a/lib/netdev.h b/lib/netdev.h index d6c07c1..7ddd790 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *); int netdev_send(struct netdev *, int qid, struct dp_packet_batch *, bool may_steal, bool concurrent_txq); void netdev_send_wait(struct netdev *, int qid); +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq); /* native tunnel APIs */ /* Structure to pass parameters required to build a tunnel header. */