Message ID | 1498775976-4142-5-git-send-email-bhanuprakash.bodireddy@intel.com |
---|---|
State | Not Applicable |
Delegated to: | Darrell Ball |
Headers | show |
-----Original Message----- From: <ovs-dev-bounces@openvswitch.org> on behalf of Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com> Date: Thursday, June 29, 2017 at 3:39 PM To: "dev@openvswitch.org" <dev@openvswitch.org> Subject: [ovs-dev] [PATCH v3 4/6] netdev-dpdk: Add intermediate queue support. This commit introduces netdev_dpdk_eth_tx_queue() function that implements intermediate queue and packet buffering. The packets get buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is reached and eventually gets transmitted. To handle the case(eg: ping) where packets are sent at low rate and can potentially get stuck in the queue, flush logic is implemented that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet processing loop. Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com> Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com> Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com> Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com> Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com> Acked-by: Eelco Chaudron <echaudro@redhat.com> --- lib/dpif-netdev.c | 44 +++++++++++++++++++++++++++++++++++++++++++- lib/netdev-dpdk.c | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 4e29085..7e1f5bc 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type { }; #define XPS_TIMEOUT_MS 500LL +#define LAST_USED_QID_NONE -1 /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct dp_netdev_rxq { @@ -492,7 +493,13 @@ struct rxq_poll { struct tx_port { struct dp_netdev_port *port; int qid; - long long last_used; + int last_used_qid; /* Last queue id where packets got + enqueued. */ + long long last_used; /* In case XPS is enabled, it contains the + * timestamp of the last time the port was + * used by the thread to send data. After + * XPS_TIMEOUT_MS elapses the qid will be + * marked as -1. */ struct hmap_node node; }; @@ -3080,6 +3087,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd, } static void +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd) +{ + struct tx_port *cached_tx_port; + int tx_qid; + + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { + tx_qid = cached_tx_port->last_used_qid; + + if (tx_qid != LAST_USED_QID_NONE) { + netdev_txq_flush(cached_tx_port->port->netdev, tx_qid, + cached_tx_port->port->dynamic_txqs); + + /* Queue flushed and mark it empty. */ + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; + } + } +} + Could you move this function and I think the other code in dpif-netdev.c to patch 6, if you can ? This function is unused, so will generate a build error with –Werror when applied in sequence and logically this seems like it can go into patch 6. Darrell +static void dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, struct netdev_rxq *rx, odp_port_t port_no) @@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->port = port; tx->qid = -1; + tx->last_used_qid = LAST_USED_QID_NONE; hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); pmd->need_reload = true; @@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, dpif_netdev_xps_revalidate_pmd(pmd, now, false); + /* The tx queue can change in XPS case, make sure packets in previous + * queue is flushed properly. */ + if (tx->last_used_qid != LAST_USED_QID_NONE && + tx->qid != tx->last_used_qid) { + netdev_txq_flush(port->netdev, tx->last_used_qid, port->dynamic_txqs); + tx->last_used_qid = LAST_USED_QID_NONE; + } + VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); return min_qid; @@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, tx_qid = pmd->static_tx_qid; } + /* In case these packets gets buffered into an intermediate + * queue and XPS is enabled the flush function could find a + * different tx qid assigned to its thread. We keep track + * of the qid we're now using, that will trigger the flush + * function and will select the right queue to flush. */ + p->last_used_qid = tx_qid; + netdev_send(p->port->netdev, tx_qid, packets_, may_steal, dynamic_txqs); return; diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 51d528d..99ad8c7 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -1444,6 +1444,7 @@ static inline int netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, struct rte_mbuf **pkts, int cnt) { + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; uint32_t nb_tx = 0; while (nb_tx != cnt) { @@ -1467,6 +1468,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, } } + txq->dpdk_pkt_cnt = 0; return cnt - nb_tx; } @@ -1866,6 +1868,37 @@ out: } } +/* Enqueue packets in an intermediate queue and call the flush + * function when the queue is full. This way we can amortize the + * cost of MMIO writes. */ +static inline int +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, + struct rte_mbuf **pkts, int cnt) +{ + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + + int i = 0; + int dropped = 0; + + while (i < cnt) { + int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->dpdk_pkt_cnt; + int tocopy = MIN(freeslots, cnt-i); + + memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i], + tocopy * sizeof (struct rte_mbuf *)); + + txq->dpdk_pkt_cnt += tocopy; + i += tocopy; + + /* Queue full, burst the packets. */ + if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->dpdk_burst_pkts, + txq->dpdk_pkt_cnt); + } + } + return dropped; +} + /* Tx function. Transmit packets indefinitely */ static void dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) @@ -1923,7 +1956,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt); dropped += qos_pkts - newcnt; - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt); + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt); } if (OVS_UNLIKELY(dropped)) { @@ -1981,7 +2014,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, cnt = netdev_dpdk_qos_run(dev, pkts, cnt); dropped = batch->count - cnt; - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); if (OVS_UNLIKELY(dropped)) { rte_spinlock_lock(&dev->stats_lock); -- 2.4.11 _______________________________________________ dev mailing list dev@openvswitch.org https://urldefense.proofpoint.com/v2/url?u=https-3A__mail.openvswitch.org_mailman_listinfo_ovs-2Ddev&d=DwICAg&c=uilaK90D4TOVoH58JNXRgQ&r=BVhFA09CGX7JQ5Ih-uZnsw&m=KQ9FwtpeTsOPh2w-yxlJqvY1HiABokd4SQ_8Y8Yivjo&s=c38_T5x1uiPW0evVYSXnJecekG0acF-Vd5JhF4DuhN8&e=
> > This commit introduces netdev_dpdk_eth_tx_queue() function that > implements intermediate queue and packet buffering. The packets get > buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is > reached and eventually gets transmitted. > > To handle the case(eg: ping) where packets are sent at low rate and > can potentially get stuck in the queue, flush logic is implemented > that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet > processing loop. > > Signed-off-by: Bhanuprakash Bodireddy ><bhanuprakash.bodireddy@intel.com> > Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com> > Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com> > Signed-off-by: Markus Magnusson <markus.magnusson@ericsson.com> > Co-authored-by: Markus Magnusson <markus.magnusson@ericsson.com> > Acked-by: Eelco Chaudron <echaudro@redhat.com> > --- > lib/dpif-netdev.c | 44 >+++++++++++++++++++++++++++++++++++++++++++- > lib/netdev-dpdk.c | 37 +++++++++++++++++++++++++++++++++++-- > 2 files changed, 78 insertions(+), 3 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 4e29085..7e1f5bc 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type { > }; > > #define XPS_TIMEOUT_MS 500LL > +#define LAST_USED_QID_NONE -1 > > /* Contained by struct dp_netdev_port's 'rxqs' member. */ > struct dp_netdev_rxq { > @@ -492,7 +493,13 @@ struct rxq_poll { > struct tx_port { > struct dp_netdev_port *port; > int qid; > - long long last_used; > + int last_used_qid; /* Last queue id where packets got > + enqueued. */ > + long long last_used; /* In case XPS is enabled, it contains the > + * timestamp of the last time the port was > + * used by the thread to send data. After > + * XPS_TIMEOUT_MS elapses the qid will be > + * marked as -1. */ > struct hmap_node node; > }; > > @@ -3080,6 +3087,25 @@ cycles_count_end(struct >dp_netdev_pmd_thread *pmd, > } > > static void > +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd) > +{ > + struct tx_port *cached_tx_port; > + int tx_qid; > + > + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { > + tx_qid = cached_tx_port->last_used_qid; > + > + if (tx_qid != LAST_USED_QID_NONE) { > + netdev_txq_flush(cached_tx_port->port->netdev, tx_qid, > + cached_tx_port->port->dynamic_txqs); > + > + /* Queue flushed and mark it empty. */ > + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; > + } > + } > +} > + > >Could you move this function and I think the other code in dpif-netdev.c to >patch 6, if you can ? Should be a simple change. Will do this. >This function is unused, so will generate a build error with –Werror when >applied in sequence and logically this seems like it can go into patch 6. Completely agree. - Bhanuprakash. > >Darrell > > > +static void > dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, > struct netdev_rxq *rx, > odp_port_t port_no) > @@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct >dp_netdev_pmd_thread *pmd, > > tx->port = port; > tx->qid = -1; > + tx->last_used_qid = LAST_USED_QID_NONE; > > hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port- >>port_no)); > pmd->need_reload = true; > @@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct >dp_netdev_pmd_thread *pmd, > > dpif_netdev_xps_revalidate_pmd(pmd, now, false); > > + /* The tx queue can change in XPS case, make sure packets in previous > + * queue is flushed properly. */ > + if (tx->last_used_qid != LAST_USED_QID_NONE && > + tx->qid != tx->last_used_qid) { > + netdev_txq_flush(port->netdev, tx->last_used_qid, port- >>dynamic_txqs); > + tx->last_used_qid = LAST_USED_QID_NONE; > + } > + > VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", > pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); > return min_qid; > @@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct >dp_packet_batch *packets_, > tx_qid = pmd->static_tx_qid; > } > > + /* In case these packets gets buffered into an intermediate > + * queue and XPS is enabled the flush function could find a > + * different tx qid assigned to its thread. We keep track > + * of the qid we're now using, that will trigger the flush > + * function and will select the right queue to flush. */ > + p->last_used_qid = tx_qid; > + > netdev_send(p->port->netdev, tx_qid, packets_, may_steal, > dynamic_txqs); > return; > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > index 51d528d..99ad8c7 100644 > --- a/lib/netdev-dpdk.c > +++ b/lib/netdev-dpdk.c > @@ -1444,6 +1444,7 @@ static inline int > netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, > struct rte_mbuf **pkts, int cnt) > { > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > uint32_t nb_tx = 0; > > while (nb_tx != cnt) { > @@ -1467,6 +1468,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk >*dev, int qid, > } > } > > + txq->dpdk_pkt_cnt = 0; > return cnt - nb_tx; > } > > @@ -1866,6 +1868,37 @@ out: > } > } > > +/* Enqueue packets in an intermediate queue and call the flush > + * function when the queue is full. This way we can amortize the > + * cost of MMIO writes. */ > +static inline int > +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, > + struct rte_mbuf **pkts, int cnt) > +{ > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > + > + int i = 0; > + int dropped = 0; > + > + while (i < cnt) { > + int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq- >>dpdk_pkt_cnt; > + int tocopy = MIN(freeslots, cnt-i); > + > + memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i], > + tocopy * sizeof (struct rte_mbuf *)); > + > + txq->dpdk_pkt_cnt += tocopy; > + i += tocopy; > + > + /* Queue full, burst the packets. */ > + if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { > + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq- >>dpdk_burst_pkts, > + txq->dpdk_pkt_cnt); > + } > + } > + return dropped; > +} > + > /* Tx function. Transmit packets indefinitely */ > static void > dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch >*batch) > @@ -1923,7 +1956,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, >struct dp_packet_batch *batch) > newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt); > > dropped += qos_pkts - newcnt; > - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt); > + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt); > } > > if (OVS_UNLIKELY(dropped)) { > @@ -1981,7 +2014,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, >int qid, > cnt = netdev_dpdk_qos_run(dev, pkts, cnt); > dropped = batch->count - cnt; > > - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); > + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); > > if (OVS_UNLIKELY(dropped)) { > rte_spinlock_lock(&dev->stats_lock); > -- > 2.4.11 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://urldefense.proofpoint.com/v2/url?u=https- >3A__mail.openvswitch.org_mailman_listinfo_ovs- >2Ddev&d=DwICAg&c=uilaK90D4TOVoH58JNXRgQ&r=BVhFA09CGX7JQ5Ih- >uZnsw&m=KQ9FwtpeTsOPh2w- >yxlJqvY1HiABokd4SQ_8Y8Yivjo&s=c38_T5x1uiPW0evVYSXnJecekG0acF- >Vd5JhF4DuhN8&e= > > > > >
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 4e29085..7e1f5bc 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type { }; #define XPS_TIMEOUT_MS 500LL +#define LAST_USED_QID_NONE -1 /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct dp_netdev_rxq { @@ -492,7 +493,13 @@ struct rxq_poll { struct tx_port { struct dp_netdev_port *port; int qid; - long long last_used; + int last_used_qid; /* Last queue id where packets got + enqueued. */ + long long last_used; /* In case XPS is enabled, it contains the + * timestamp of the last time the port was + * used by the thread to send data. After + * XPS_TIMEOUT_MS elapses the qid will be + * marked as -1. */ struct hmap_node node; }; @@ -3080,6 +3087,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd, } static void +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd) +{ + struct tx_port *cached_tx_port; + int tx_qid; + + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { + tx_qid = cached_tx_port->last_used_qid; + + if (tx_qid != LAST_USED_QID_NONE) { + netdev_txq_flush(cached_tx_port->port->netdev, tx_qid, + cached_tx_port->port->dynamic_txqs); + + /* Queue flushed and mark it empty. */ + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; + } + } +} + +static void dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, struct netdev_rxq *rx, odp_port_t port_no) @@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->port = port; tx->qid = -1; + tx->last_used_qid = LAST_USED_QID_NONE; hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); pmd->need_reload = true; @@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, dpif_netdev_xps_revalidate_pmd(pmd, now, false); + /* The tx queue can change in XPS case, make sure packets in previous + * queue is flushed properly. */ + if (tx->last_used_qid != LAST_USED_QID_NONE && + tx->qid != tx->last_used_qid) { + netdev_txq_flush(port->netdev, tx->last_used_qid, port->dynamic_txqs); + tx->last_used_qid = LAST_USED_QID_NONE; + } + VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); return min_qid; @@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, tx_qid = pmd->static_tx_qid; } + /* In case these packets gets buffered into an intermediate + * queue and XPS is enabled the flush function could find a + * different tx qid assigned to its thread. We keep track + * of the qid we're now using, that will trigger the flush + * function and will select the right queue to flush. */ + p->last_used_qid = tx_qid; + netdev_send(p->port->netdev, tx_qid, packets_, may_steal, dynamic_txqs); return; diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 51d528d..99ad8c7 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -1444,6 +1444,7 @@ static inline int netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, struct rte_mbuf **pkts, int cnt) { + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; uint32_t nb_tx = 0; while (nb_tx != cnt) { @@ -1467,6 +1468,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, } } + txq->dpdk_pkt_cnt = 0; return cnt - nb_tx; } @@ -1866,6 +1868,37 @@ out: } } +/* Enqueue packets in an intermediate queue and call the flush + * function when the queue is full. This way we can amortize the + * cost of MMIO writes. */ +static inline int +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, + struct rte_mbuf **pkts, int cnt) +{ + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + + int i = 0; + int dropped = 0; + + while (i < cnt) { + int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->dpdk_pkt_cnt; + int tocopy = MIN(freeslots, cnt-i); + + memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i], + tocopy * sizeof (struct rte_mbuf *)); + + txq->dpdk_pkt_cnt += tocopy; + i += tocopy; + + /* Queue full, burst the packets. */ + if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->dpdk_burst_pkts, + txq->dpdk_pkt_cnt); + } + } + return dropped; +} + /* Tx function. Transmit packets indefinitely */ static void dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) @@ -1923,7 +1956,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) newcnt = netdev_dpdk_qos_run(dev, pkts, newcnt); dropped += qos_pkts - newcnt; - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, newcnt); + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, newcnt); } if (OVS_UNLIKELY(dropped)) { @@ -1981,7 +2014,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, cnt = netdev_dpdk_qos_run(dev, pkts, cnt); dropped = batch->count - cnt; - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); if (OVS_UNLIKELY(dropped)) { rte_spinlock_lock(&dev->stats_lock);