@@ -274,9 +274,18 @@ pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
if (tx_packets > 0) {
ds_put_format(str,
" Tx packets: %12"PRIu64" (%.0f Kpps)\n"
- " Tx batches: %12"PRIu64" (%.2f pkts/batch)\n",
+ " Tx batches: %12"PRIu64" (%.2f pkts/batch)\n"
+ "\n"
+ " - Work deferred: %12"PRIu64"\n"
+ " - Work deferred again: %12"PRIu64"\n"
+ " - Ring full when deferring work: %12"PRIu64"\n"
+ " - Deferred work done: %12"PRIu64"\n",
tx_packets, (tx_packets / duration) / 1000,
- tx_batches, 1.0 * tx_packets / tx_batches);
+ tx_batches, 1.0 * tx_packets / tx_batches,
+ stats[PMD_STAT_WORK_DEFER],
+ stats[PMD_STAT_WORK_RE_DEFER],
+ stats[PMD_STAT_WORK_R_FULL],
+ stats[PMD_STAT_WORK_DONE]);
} else {
ds_put_format(str, " Tx packets: %12d\n\n", 0);
}
@@ -74,6 +74,13 @@ enum pmd_stat_type {
* recirculation. */
PMD_STAT_SENT_PKTS, /* Packets that have been sent. */
PMD_STAT_SENT_BATCHES, /* Number of batches sent. */
+ PMD_STAT_WORK_DEFER, /* Number of times that work was deferred. */
+ PMD_STAT_WORK_RE_DEFER, /* Number of times that already deferred work was
+ * deferred again. */
+ PMD_STAT_WORK_R_FULL, /* Number of times work ring was full when
+ * deferring work. */
+ PMD_STAT_WORK_DONE, /* Number of times that deferred work was
+ * completed. */
PMD_CYCLES_ITER_IDLE, /* Cycles spent in idle iterations. */
PMD_CYCLES_ITER_BUSY, /* Cycles spent in busy iterations. */
PMD_CYCLES_UPCALL, /* Cycles spent processing upcalls. */
@@ -82,6 +82,9 @@
#include "util.h"
#include "uuid.h"
+/* TODO: Don't rely only on DPDK ring library in lib/dpif-netdev.c. */
+#include <rte_ring.h>
+
VLOG_DEFINE_THIS_MODULE(dpif_netdev);
/* Auto Load Balancing Defaults */
@@ -305,6 +308,20 @@ struct pmd_auto_lb {
atomic_uint8_t rebalance_load_thresh;
};
+/* Function definition for deferred work. */
+typedef int (*dp_defer_work_func)(struct netdev *netdev, int qid);
+
+/* Structure to track outstanding work for this netdev. */
+struct dp_defer_work_item {
+ dp_defer_work_func work_func;
+ void *netdev;
+ int qid;
+};
+
+struct dp_defer {
+ struct rte_ring *avail_work_ring;
+};
+
/* Datapath based on the network device interface from netdev.h.
*
*
@@ -630,6 +647,7 @@ struct tx_port {
long long flush_time;
struct dp_packet_batch output_pkts;
struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
+ dp_defer_work_func cached_work_func;
};
/* Contained by struct tx_bond 'member_buckets'. */
@@ -809,6 +827,9 @@ struct dp_netdev_pmd_thread {
/* Next time when PMD should try RCU quiescing. */
long long next_rcu_quiesce;
+
+ /* Structure to track deferred work in this thread. */
+ struct dp_defer defer;
};
/* Interface to netdev-based datapath. */
@@ -818,6 +839,66 @@ struct dpif_netdev {
uint64_t last_port_seq;
};
+static int32_t
+dp_defer_init(struct dp_netdev_pmd_thread *pmd)
+{
+#define RING_NAME_SIZE 20
+ struct dp_defer *defer = &pmd->defer;
+ char ring_name[RING_NAME_SIZE];
+
+ snprintf(ring_name, RING_NAME_SIZE, "pmd_id%u_work_ring", pmd->core_id);
+
+ /* Create SP/SC work ring. */
+ defer->avail_work_ring = rte_ring_create_elem(ring_name,
+ sizeof(struct dp_defer_work_item), /* Ring element size. */
+ 128, /* Number of elements. */
+ 0, /* Socket id. */
+ RING_F_SP_ENQ | RING_F_SC_DEQ); /* Flags. */
+
+ ovs_assert(defer->avail_work_ring);
+ VLOG_INFO("defer initialized, %p\n", defer->avail_work_ring);
+ return 0;
+}
+
+static inline int
+dp_defer_work(struct dp_netdev_pmd_thread *pmd,
+ struct dp_defer_work_item *work)
+{
+ int err = rte_ring_enqueue_elem(pmd->defer.avail_work_ring, work,
+ sizeof(struct dp_defer_work_item));
+ return err;
+}
+
+static inline int
+dp_defer_do_work(struct dp_netdev_pmd_thread *pmd)
+{
+ struct dp_defer_work_item work;
+
+ int err = rte_ring_dequeue_elem(pmd->defer.avail_work_ring, &work,
+ sizeof(struct dp_defer_work_item));
+ if (!err) {
+ int ret = work.work_func(work.netdev, work.qid);
+
+ if (ret == -EINPROGRESS) {
+ /* Defer the work again. Don't check the return status of
+ * dp_defer_work() since it can't fail. There will always be room
+ * to defer one piece of work after the dequeue above. */
+ dp_defer_work(pmd, &work);
+ pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_RE_DEFER,
+ 1);
+ } else {
+ pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_DONE, 1);
+ }
+ }
+ return err;
+}
+
+static inline unsigned int
+dp_defer_work_count(struct dp_netdev_pmd_thread *pmd)
+{
+ return rte_ring_count(pmd->defer.avail_work_ring);
+}
+
static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
struct dp_netdev_port **portp)
OVS_REQUIRES(dp->port_mutex);
@@ -4661,6 +4742,7 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
int output_cnt;
bool dynamic_txqs;
struct cycle_timer timer;
+ struct netdev *netdev;
uint64_t cycles;
uint32_t tx_flush_interval;
@@ -4676,7 +4758,28 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
output_cnt = dp_packet_batch_size(&p->output_pkts);
ovs_assert(output_cnt > 0);
- netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
+ netdev = p->port->netdev;
+ int ret = netdev_send(netdev, tx_qid, &p->output_pkts,
+ dynamic_txqs);
+
+ if (ret == -EINPROGRESS) {
+ struct dp_defer_work_item work = {
+ .work_func = p->cached_work_func,
+ .netdev = netdev,
+ .qid = tx_qid,
+ };
+
+ /* Defer the work. */
+ while (dp_defer_work(pmd, &work)) {
+ /* The work ring is full, try to make room by doing work. Doing
+ * work can fail to make room if the work has to be requeued. Keep
+ * trying to do work until there is room in the ring. */
+ dp_defer_do_work(pmd);
+ pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_R_FULL, 1);
+ }
+ pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_WORK_DEFER, 1);
+ }
+
dp_packet_batch_init(&p->output_pkts);
/* Update time of the next flush. */
@@ -4686,6 +4789,9 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
ovs_assert(pmd->n_output_batches > 0);
pmd->n_output_batches--;
+ /* The batch, and number of packets are updated as sent here, even though
+ * some packets might have been dropped, or are in transit asynchronously.
+ */
pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1);
@@ -6068,10 +6174,19 @@ reload:
tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
}
+ /* Try to do every piece of work on the work ring once. */
+ for (i = 0; i < dp_defer_work_count(pmd); i++) {
+ dp_defer_do_work(pmd);
+ }
+
/* Do RCU synchronization at fixed interval. This ensures that
* synchronization would not be delayed long even at high load of
* packet processing. */
if (pmd->ctx.now > pmd->next_rcu_quiesce) {
+ /* Do any work outstanding on this PMD thread. */
+ while (!dp_defer_do_work(pmd)) {
+ continue;
+ }
if (!ovsrcu_try_quiesce()) {
pmd->next_rcu_quiesce =
pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
@@ -6080,7 +6195,10 @@ reload:
if (lc++ > 1024) {
lc = 0;
-
+ /* Do any work outstanding on this PMD thread. */
+ while (!dp_defer_do_work(pmd)) {
+ continue;
+ }
coverage_try_clear();
dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
if (!ovsrcu_try_quiesce()) {
@@ -6572,6 +6690,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd_perf_stats_init(&pmd->perf_stats);
cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
hash_int(core_id, 0));
+
+ dp_defer_init(pmd);
}
static void
@@ -6741,6 +6861,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
tx->qid = -1;
tx->flush_time = 0LL;
dp_packet_batch_init(&tx->output_pkts);
+ tx->cached_work_func = port->netdev->netdev_class->process_async;
hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
pmd->need_reload = true;
@@ -2585,7 +2585,7 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev,
}
}
-static void
+static int
__netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
struct dp_packet **pkts, int cnt)
{
@@ -2667,6 +2667,8 @@ out:
for (i = 0; i < n_packets_to_free; i++) {
dp_packet_delete(pkts[i]);
}
+
+ return 0;
}
static void
@@ -2774,7 +2776,7 @@ dpdk_copy_dp_packet_to_mbuf(struct rte_mempool *mp, struct dp_packet *pkt_orig)
}
/* Tx function. Transmit packets indefinitely */
-static void
+static int
dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
@@ -2793,6 +2795,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
uint32_t tx_failure = 0;
uint32_t mtu_drops = 0;
uint32_t qos_drops = 0;
+ int ret = 0;
if (dev->type != DPDK_DEV_VHOST) {
/* Check if QoS has been configured for this netdev. */
@@ -2826,7 +2829,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
if (OVS_LIKELY(txcnt)) {
if (dev->type == DPDK_DEV_VHOST) {
- __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
+ ret = __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
} else {
tx_failure += netdev_dpdk_eth_tx_burst(dev, qid,
(struct rte_mbuf **)pkts,
@@ -2843,6 +2846,8 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
sw_stats->tx_qos_drops += qos_drops;
rte_spinlock_unlock(&dev->stats_lock);
}
+
+ return ret;
}
static int
@@ -2851,14 +2856,15 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
bool concurrent_txq OVS_UNUSED)
{
+ int ret = 0;
if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
- dpdk_do_tx_copy(netdev, qid, batch);
+ ret = dpdk_do_tx_copy(netdev, qid, batch);
dp_packet_delete_batch(batch, true);
} else {
- __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
- dp_packet_batch_size(batch));
+ ret = __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
+ dp_packet_batch_size(batch));
}
- return 0;
+ return ret;
}
static inline void
@@ -5348,6 +5354,7 @@ static const struct netdev_class dpdk_vhost_class = {
.construct = netdev_dpdk_vhost_construct,
.destruct = netdev_dpdk_vhost_destruct,
.send = netdev_dpdk_vhost_send,
+ .process_async = NULL,
.get_carrier = netdev_dpdk_vhost_get_carrier,
.get_stats = netdev_dpdk_vhost_get_stats,
.get_custom_stats = netdev_dpdk_get_sw_custom_stats,
@@ -5364,6 +5371,7 @@ static const struct netdev_class dpdk_vhost_client_class = {
.destruct = netdev_dpdk_vhost_destruct,
.set_config = netdev_dpdk_vhost_client_set_config,
.send = netdev_dpdk_vhost_send,
+ .process_async = NULL,
.get_carrier = netdev_dpdk_vhost_get_carrier,
.get_stats = netdev_dpdk_vhost_get_stats,
.get_custom_stats = netdev_dpdk_get_sw_custom_stats,
@@ -384,7 +384,11 @@ struct netdev_class {
* if it would always return EOPNOTSUPP anyhow. (This will prevent the
* network device from being usefully used by the netdev-based "userspace
* datapath". It will also prevent the OVS implementation of bonding from
- * working properly over 'netdev'.) */
+ * working properly over 'netdev'.)
+ *
+ * May return EINPROGRESS. This indicates that the netdev has more work to
+ * do, and needs to have process_async called before sending buffers is
+ * totally completed. */
int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
bool concurrent_txq);
@@ -402,6 +406,15 @@ struct netdev_class {
* implement packet transmission through the 'send' member function. */
void (*send_wait)(struct netdev *netdev, int qid);
+ /* Performs asynchronous work required by the netdev to complete sending
+ * the buffers. The work done in the process_async function is netdev
+ * specific, but could include freeing packets or updating port stats.
+ *
+ * May return EINPROGRESS if the async call still hasn't completed,
+ * indicating process_async should be called on this function again in the
+ * future. */
+ int (*process_async)(struct netdev *netdev, int qid);
+
/* Sets 'netdev''s Ethernet address to 'mac' */
int (*set_etheraddr)(struct netdev *netdev, const struct eth_addr mac);
@@ -892,7 +892,8 @@ netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
}
error = netdev->netdev_class->send(netdev, qid, batch, concurrent_txq);
- if (!error) {
+ /* For async, treat netdev_send as called when -EINPROGRESS is returned. */
+ if (!error || error == -EINPROGRESS) {
COVERAGE_INC(netdev_sent);
}
return error;