diff mbox series

[ovs-dev,v2,1/3] dpif-netdev: Add a per thread work ring

Message ID 20210907111725.43672-2-cian.ferriter@intel.com
State Rejected
Headers show
Series Userspace deferral of work | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Ferriter, Cian Sept. 7, 2021, 11:17 a.m. UTC
These work rings help with handling the asynchronous TX usecase. In this
usecase, netdev_send will be called, but packets won't be immediately
sent by the thread calling netdev_send, but instead handled by a
different resource. Since the TX is not instantaneous, the thread
calling netdev_send can't immediately free the packets being sent, or
report them as sent. Rather than the thread polling for completion of
the TX, it is desirable that the thread to move on and process more
packets.

The work ring serves as a FIFO queue to keep track of the asynchronous
TX calls that have been kicked off. The work ring is added/queued to when
netdev_send returns '-EINPROGRESS' indicating it kicked off an
asynchronous TX. The work ring is taken/dequeued from in 2 main cases:
1. In pmd_thread_main after processing every rxq assigned to the thread.
2. When the ring is full while trying to queue work.

Some dp_defer functions are defined in dpif-netev.c. It would be nice to
define these in dpif-netdev-private-defer.h, but these functions rely on
the cycle_time_stop/start() functions and dp_netdev_rxq_add_cycles(). I
tried to move these to a header so they could be included, but they rely
on more structs. I stopped this because I would have to move bigger
structs like "struct dp_netdev_rxq" which rely on even more structs all
defined in dpif-netdev.c.

Signed-off-by: Cian Ferriter <cian.ferriter@intel.com>
Co-authored-by: Harry van Haaren <harry.van.haaren@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
Co-authored-by: Sunil Pai G <sunil.pai.g@intel.com>
Signed-off-by: Sunil Pai G <sunil.pai.g@intel.com>

---

v2:
- Implement and use a simpler ring buffer in OVS, rather than using the
  DPDK implementation.
- Only print work defer stats if some work has actually been deferred.
- Add a "force" flag to the "process_async()" API to implement an
  attempt limit on the number of times an asynchronous piece of work
  should be attempted.
- Do all outstanding work on a PMD thread before allowing a reload to
  occur.
---
 lib/automake.mk                  |   1 +
 lib/dpif-netdev-perf.c           |  20 ++++-
 lib/dpif-netdev-perf.h           |   9 ++
 lib/dpif-netdev-private-defer.h  |  84 +++++++++++++++++++
 lib/dpif-netdev-private-thread.h |   4 +
 lib/dpif-netdev.c                | 139 +++++++++++++++++++++++++++++--
 lib/netdev-dpdk.c                |  22 +++--
 lib/netdev-provider.h            |  19 ++++-
 lib/netdev.c                     |   3 +-
 9 files changed, 286 insertions(+), 15 deletions(-)
 create mode 100644 lib/dpif-netdev-private-defer.h

Comments

Aaron Conole Sept. 10, 2021, 6:36 p.m. UTC | #1
Hi Cian,

Apologies for the mix of code, design comments.  I have lots of
intermingled issues here (but I also didn't spend too detailed look at
the implementation).

Cian Ferriter <cian.ferriter@intel.com> writes:

> These work rings help with handling the asynchronous TX usecase. In this
> usecase, netdev_send will be called, but packets won't be immediately
> sent by the thread calling netdev_send, but instead handled by a
> different resource. Since the TX is not instantaneous, the thread
> calling netdev_send can't immediately free the packets being sent, or
> report them as sent. Rather than the thread polling for completion of
> the TX, it is desirable that the thread to move on and process more
> packets.
>
> The work ring serves as a FIFO queue to keep track of the asynchronous
> TX calls that have been kicked off. The work ring is added/queued to when
> netdev_send returns '-EINPROGRESS' indicating it kicked off an
> asynchronous TX. The work ring is taken/dequeued from in 2 main cases:
> 1. In pmd_thread_main after processing every rxq assigned to the thread.
> 2. When the ring is full while trying to queue work.

I'm not sold on the overall idea - why can't the vhost library perform
the DMA internally?  Other devices (netdev-linux, f.e.) manage DMA
transfers external to OVS.  It seems like you're making a change here
that will impact all of PMD design for vhost only.

> Some dp_defer functions are defined in dpif-netev.c. It would be nice to
> define these in dpif-netdev-private-defer.h, but these functions rely on
> the cycle_time_stop/start() functions and dp_netdev_rxq_add_cycles(). I
> tried to move these to a header so they could be included, but they rely
> on more structs. I stopped this because I would have to move bigger
> structs like "struct dp_netdev_rxq" which rely on even more structs all
> defined in dpif-netdev.c.
>
> Signed-off-by: Cian Ferriter <cian.ferriter@intel.com>
> Co-authored-by: Harry van Haaren <harry.van.haaren@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> Co-authored-by: Sunil Pai G <sunil.pai.g@intel.com>
> Signed-off-by: Sunil Pai G <sunil.pai.g@intel.com>
>
> ---
>
> v2:
> - Implement and use a simpler ring buffer in OVS, rather than using the
>   DPDK implementation.
> - Only print work defer stats if some work has actually been deferred.
> - Add a "force" flag to the "process_async()" API to implement an
>   attempt limit on the number of times an asynchronous piece of work
>   should be attempted.
> - Do all outstanding work on a PMD thread before allowing a reload to
>   occur.
> ---
>  lib/automake.mk                  |   1 +
>  lib/dpif-netdev-perf.c           |  20 ++++-
>  lib/dpif-netdev-perf.h           |   9 ++
>  lib/dpif-netdev-private-defer.h  |  84 +++++++++++++++++++
>  lib/dpif-netdev-private-thread.h |   4 +
>  lib/dpif-netdev.c                | 139 +++++++++++++++++++++++++++++--
>  lib/netdev-dpdk.c                |  22 +++--
>  lib/netdev-provider.h            |  19 ++++-
>  lib/netdev.c                     |   3 +-
>  9 files changed, 286 insertions(+), 15 deletions(-)
>  create mode 100644 lib/dpif-netdev-private-defer.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 46f869a33..0d910bc92 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -115,6 +115,7 @@ lib_libopenvswitch_la_SOURCES = \
>  	lib/dpif-netdev-lookup-generic.c \
>  	lib/dpif-netdev.c \
>  	lib/dpif-netdev.h \
> +	lib/dpif-netdev-private-defer.h \
>  	lib/dpif-netdev-private-dfc.c \
>  	lib/dpif-netdev-private-dfc.h \
>  	lib/dpif-netdev-private-dpcls.h \
> diff --git a/lib/dpif-netdev-perf.c b/lib/dpif-netdev-perf.c
> index d7676ea2b..859ef300c 100644
> --- a/lib/dpif-netdev-perf.c
> +++ b/lib/dpif-netdev-perf.c
> @@ -230,6 +230,7 @@ pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
>      uint64_t tot_iter = histogram_samples(&s->pkts);
>      uint64_t idle_iter = s->pkts.bin[0];
>      uint64_t busy_iter = tot_iter >= idle_iter ? tot_iter - idle_iter : 0;
> +    uint64_t work_deferred = stats[PMD_STAT_WORK_DEFER];
>  
>      ds_put_format(str,
>              "  Iterations:        %12"PRIu64"  (%.2f us/it)\n"
> @@ -284,7 +285,24 @@ pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
>              tx_packets, (tx_packets / duration) / 1000,
>              tx_batches, 1.0 * tx_packets / tx_batches);
>      } else {
> -        ds_put_format(str, "  Tx packets:        %12d\n\n", 0);
> +        ds_put_format(str, "  Tx packets:        %12d\n", 0);
> +    }
> +    if (work_deferred > 0) {
> +        uint64_t work_compl_checks = stats[PMD_STAT_WORK_IN_PROG] +
> +                                     stats[PMD_STAT_WORK_DONE];
> +
> +        ds_put_format(str,
> +            "  Work deferred:                   %12"PRIu64"\n"
> +            "  - Deferred work done:            %12"PRIu64"\n"
> +            "  - Work completion checks:        %12"PRIu64
> +                                                "  (%.2f checks/work item)\n"
> +            "  - Ring full when deferring work: %12"PRIu64"\n"
> +            "  - Deferred work dropped:         %12"PRIu64"\n",
> +            work_deferred, stats[PMD_STAT_WORK_DONE], work_compl_checks,
> +            1.0 * work_compl_checks / stats[PMD_STAT_WORK_DONE],
> +            stats[PMD_STAT_WORK_R_FULL], stats[PMD_STAT_WORK_DROPPED]);
> +    } else {
> +        ds_put_format(str, "  Work deferred:     %12d\n\n", 0);
>      }
>  }
>  
> diff --git a/lib/dpif-netdev-perf.h b/lib/dpif-netdev-perf.h
> index 834c26260..e9c02a866 100644
> --- a/lib/dpif-netdev-perf.h
> +++ b/lib/dpif-netdev-perf.h
> @@ -76,6 +76,15 @@ enum pmd_stat_type {
>                               * recirculation. */
>      PMD_STAT_SENT_PKTS,     /* Packets that have been sent. */
>      PMD_STAT_SENT_BATCHES,  /* Number of batches sent. */
> +    PMD_STAT_WORK_DEFER,    /* Number of times that work was deferred. */
> +    PMD_STAT_WORK_IN_PROG,  /* Number of times that work was still in progress
> +                               when checked by a thread. */
> +    PMD_STAT_WORK_R_FULL,   /* Number of times work ring was full when
> +                             * deferring work. */
> +    PMD_STAT_WORK_DONE,     /* Number of times that deferred work was
> +                             * completed. */
> +    PMD_STAT_WORK_DROPPED,  /* Number of times that deferred work was dropped.
> +                             */
>      PMD_CYCLES_ITER_IDLE,   /* Cycles spent in idle iterations. */
>      PMD_CYCLES_ITER_BUSY,   /* Cycles spent in busy iterations. */
>      PMD_CYCLES_UPCALL,      /* Cycles spent processing upcalls. */
> diff --git a/lib/dpif-netdev-private-defer.h b/lib/dpif-netdev-private-defer.h
> new file mode 100644
> index 000000000..78c140f56
> --- /dev/null
> +++ b/lib/dpif-netdev-private-defer.h
> @@ -0,0 +1,84 @@
> +/*
> + * Copyright (c) 2021 Intel Corporation.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef DPIF_NETDEV_PRIVATE_DEFER_H
> +#define DPIF_NETDEV_PRIVATE_DEFER_H 1
> +
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#include "dpif.h"
> +#include "dpif-netdev-perf.h"
> +#include "cmap.h"
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +/* Function definition for deferred work. */
> +typedef int (*dp_defer_work_func)(struct netdev *netdev, int qid, bool force);
> +
> +/* Structure to track outstanding work to be done. */
> +struct dp_defer_work_item {
> +    dp_defer_work_func work_func;
> +    void *netdev;
> +    int qid;
> +    uint32_t attempts;
> +};

This is incredibly broad and generic.  I am worried because it seems any
future developer would be able to queue an arbitrary work item here, and
this infrastructure isn't accounting for that.  Why do we need such a
work function?  Can we not just explicitly add support for this in the
PMD itself using the netdev API you add as a call through?

Why do we even need it, though?  For example, DMA is handled in netlink
datapath already by the driver.  Why can't the userspace drivers manage
this internally as part of their work as well?

> +
> +#define WORK_RING_SIZE 128
> +#define WORK_RING_MASK (WORK_RING_SIZE - 1)

Probably we need to put a comment about this - you don't just assume
that the index is a power of two, the ring algorithm requires that there
is a power of two.

> +
> +#define ATTEMPT_LIMIT 1000
> +
> +/* The read and write indexes are between 0 and 2^32, and we mask their value
> + * when we access the work_ring[] array. */
> +struct dp_defer {
> +    uint32_t read_idx;
> +    uint32_t write_idx;
> +    struct dp_defer_work_item work_ring[WORK_RING_SIZE];
> +};
> +
> +static inline void
> +dp_defer_init(struct dp_defer *defer)
> +{
> +    defer->read_idx = 0;
> +    defer->write_idx = 0;
> +}
> +
> +static inline int
> +dp_defer_work_ring_empty(const struct dp_defer *defer)
> +{
> +    return defer->write_idx == defer->read_idx;
> +}
> +
> +static inline int
> +dp_defer_work_ring_full(const struct dp_defer *defer)
> +{
> +    /* When the write index is exactly (WORK_RING_SIZE - 1) or WORK_RING_MASK
> +     * elements ahead of the read index, the ring is full. When calculating the
> +     * difference between the indexes, wraparound is not an issue since
> +     * unsigned ints are used. */
> +    uint16_t count = (defer->write_idx - defer->read_idx) & WORK_RING_MASK;
> +
> +    return count == WORK_RING_MASK;
> +}
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +#endif /* dpif-netdev-private-defer.h */
> diff --git a/lib/dpif-netdev-private-thread.h b/lib/dpif-netdev-private-thread.h
> index a782d9678..d14a5ade7 100644
> --- a/lib/dpif-netdev-private-thread.h
> +++ b/lib/dpif-netdev-private-thread.h
> @@ -20,6 +20,7 @@
>  
>  #include "dpif.h"
>  #include "dpif-netdev-perf.h"
> +#include "dpif-netdev-private-defer.h"
>  #include "dpif-netdev-private-dfc.h"
>  #include "dpif-netdev-private-dpif.h"
>  
> @@ -219,6 +220,9 @@ struct dp_netdev_pmd_thread {
>  
>      /* Next time when PMD should try RCU quiescing. */
>      long long next_rcu_quiesce;
> +
> +    /* Structure to track deferred work in this thread. */
> +    struct dp_defer defer;
>  };
>  
>  #ifdef  __cplusplus
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index b3e57bb95..f4143a93a 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -435,6 +435,7 @@ struct tx_port {
>      long long flush_time;
>      struct dp_packet_batch output_pkts;
>      struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
> +    dp_defer_work_func cached_work_func;
>  };
>  
>  /* Contained by struct tx_bond 'member_buckets'. */
> @@ -4591,6 +4592,97 @@ pmd_perf_metrics_enabled(const struct dp_netdev_pmd_thread *pmd OVS_UNUSED)
>  }
>  #endif
>  
> +/* Try to do one piece of work from the work ring.
> + *
> + * Returns:
> + * -ENOENT: No work to do. The ring is empty.
> + * -EINPROGRESS: The work is still in progress, I can't do it.
> + * 0: One piece of work was taken from the ring. It was either successfully
> + *    handled, or dropped if attempted too many times.
> + */
> +static inline unsigned int

Don't use 'static inline' inside a .c file, please.  'static' should
suffice.  We generally try to avoid it (although some have slipped in
over the years - it would be nice to clean up).

> +dp_defer_do_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats)
> +{
> +    struct dp_defer_work_item *work;
> +    uint32_t read_idx;
> +    int ret;
> +
> +    /* Check that there's a piece of work in the ring to do. */
> +    if (dp_defer_work_ring_empty(defer)) {
> +        return -ENOENT;
> +    }
> +
> +    read_idx = defer->read_idx & WORK_RING_MASK;
> +    work = &defer->work_ring[read_idx];
> +    ret = work->work_func(work->netdev, work->qid, false);
> +
> +    if (ret == -EINPROGRESS) {
> +        pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_IN_PROG, 1);
> +
> +        work->attempts++;
> +        if (work->attempts > ATTEMPT_LIMIT) {

It looks like the design HoL block here.  IE: we don't know what work
the following work items have done.  They may have completed, but we
will be stuck for at least ATTEMPT_LIMIT number of iterations before
advancing.

Since this is always a FIFO, if a different 'async' work item gets added
(like compression / ipsec assist), such work can be stalled while we
wait for DMA to complete.

> +            ret = work->work_func(work->netdev, work->qid, true);
> +            defer->read_idx++;
> +
> +            if (ret) {
> +                pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DROPPED, 1);
> +            } else {
> +                pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1);
> +            }
> +
> +            return 0;
> +        }
> +
> +        return ret;
> +    }

What if ret == EINVAL or something?  That got squelched here.  Maybe
some debug would be needed.  Additionally, in some cases, we would say
PMD_STAT_WORK_DROPPED (for example, let's say final iteration above, we
would have EACCES or something).  So these error should be counted
somewhere consistent, I think.

> +
> +    defer->read_idx++;
> +
> +    pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1);
> +
> +    return 0;
> +}
> +
> +static inline void
> +dp_defer_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats,
> +              struct dp_defer_work_item *work)
> +{
> +    struct dp_defer_work_item *ring_item;
> +    uint32_t write_idx;
> +
> +    /* Check that we have enough room in ring. */
> +    if (dp_defer_work_ring_full(defer)) {
> +        /* The work ring is full, try to make room by doing work. Doing work
> +         * can fail to make room if the work has to be requeued. Keep trying to
> +         * do work until there is room in the ring. */
> +        pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_R_FULL, 1);
> +
> +        while (dp_defer_do_work(defer, perf_stats)) {
> +            continue;
> +        }
> +    }
> +
> +    write_idx = defer->write_idx & WORK_RING_MASK;
> +    ring_item = &defer->work_ring[write_idx];
> +
> +    ring_item->work_func = work->work_func;
> +    ring_item->netdev = work->netdev;
> +    ring_item->qid = work->qid;
> +    ring_item->attempts = 0;
> +
> +    defer->write_idx++;
> +
> +    pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DEFER, 1);
> +}
> +
> +static inline void
> +dp_defer_do_all_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats)
> +{
> +    while (dp_defer_do_work(defer, perf_stats) != -ENOENT) {
> +        continue;
> +    }
> +}
> +
>  static int
>  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>                                     struct tx_port *p)
> @@ -4600,10 +4692,12 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>      int output_cnt;
>      bool dynamic_txqs;
>      struct cycle_timer timer;
> +    struct netdev *netdev;
>      uint64_t cycles;
>      uint32_t tx_flush_interval;
> +    struct pmd_perf_stats *perf_stats = &pmd->perf_stats;
>  
> -    cycle_timer_start(&pmd->perf_stats, &timer);
> +    cycle_timer_start(perf_stats, &timer);
>  
>      dynamic_txqs = p->port->dynamic_txqs;
>      if (dynamic_txqs) {
> @@ -4615,7 +4709,21 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>      output_cnt = dp_packet_batch_size(&p->output_pkts);
>      ovs_assert(output_cnt > 0);
>  
> -    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
> +    netdev = p->port->netdev;
> +    int ret = netdev_send(netdev, tx_qid, &p->output_pkts,
> +                          dynamic_txqs);
> +
> +    if (ret == -EINPROGRESS) {
> +        struct dp_defer_work_item work = {
> +            .work_func = p->cached_work_func,
> +            .netdev = netdev,
> +            .qid = tx_qid,
> +        };
> +
> +        /* Defer the work. */
> +        dp_defer_work(&pmd->defer, perf_stats, &work);
> +    }
> +
>      dp_packet_batch_init(&p->output_pkts);
>  
>      /* Update time of the next flush. */
> @@ -4625,12 +4733,15 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>      ovs_assert(pmd->n_output_batches > 0);
>      pmd->n_output_batches--;
>  
> -    pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
> -    pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1);
> +    /* The batch and number of packets are updated as sent here, even though
> +     * some packets might have been dropped, or are in transit asynchronously.
> +     */
> +    pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_PKTS, output_cnt);

Isn't this a problem when we hit the drop case in deferred work?

> +    pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_BATCHES, 1);
>  
>      /* Distribute send cycles evenly among transmitted packets and assign to
>       * their respective rx queues. */
> -    cycles = cycle_timer_stop(&pmd->perf_stats, &timer) / output_cnt;
> +    cycles = cycle_timer_stop(perf_stats, &timer) / output_cnt;
>      for (i = 0; i < output_cnt; i++) {
>          if (p->output_pkts_rxqs[i]) {
>              dp_netdev_rxq_add_cycles(p->output_pkts_rxqs[i],
> @@ -6196,6 +6307,7 @@ reload:
>      ovs_mutex_lock(&pmd->perf_stats.stats_mutex);
>      for (;;) {
>          uint64_t rx_packets = 0, tx_packets = 0;
> +        struct dp_defer *defer = &pmd->defer;
>  
>          pmd_perf_start_iteration(s);
>  
> @@ -6228,10 +6340,20 @@ reload:
>              tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
>          }
>  
> +        /* Try to clear the work ring. If a piece of work is still in progress,
> +         * don't attempt to do the remaining work items. They will be postponed
> +         * to the next interation of pmd_thread_main(). */
> +        while (!dp_defer_do_work(defer, s)) {
> +            continue;
> +        }
> +
>          /* Do RCU synchronization at fixed interval.  This ensures that
>           * synchronization would not be delayed long even at high load of
>           * packet processing. */
>          if (pmd->ctx.now > pmd->next_rcu_quiesce) {
> +            /* Do any work outstanding on this PMD thread. */
> +            dp_defer_do_all_work(defer, s);
> +
>              if (!ovsrcu_try_quiesce()) {
>                  pmd->next_rcu_quiesce =
>                      pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
> @@ -6240,6 +6362,8 @@ reload:
>  
>          if (lc++ > 1024) {
>              lc = 0;
> +            /* Do any work outstanding on this PMD thread. */
> +            dp_defer_do_all_work(defer, s);
>  
>              coverage_try_clear();
>              dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
> @@ -6262,6 +6386,8 @@ reload:
>  
>          atomic_read_explicit(&pmd->reload, &reload, memory_order_acquire);
>          if (OVS_UNLIKELY(reload)) {
> +            /* Do any work outstanding on this PMD thread. */
> +            dp_defer_do_all_work(defer, s);
>              break;
>          }
>  
> @@ -6748,6 +6874,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd_perf_stats_init(&pmd->perf_stats);
>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
>                  hash_int(core_id, 0));
> +
> +    dp_defer_init(&pmd->defer);
>  }
>  
>  static void
> @@ -6918,6 +7046,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>      tx->qid = -1;
>      tx->flush_time = 0LL;
>      dp_packet_batch_init(&tx->output_pkts);
> +    tx->cached_work_func = port->netdev->netdev_class->process_async;
>  
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>      pmd->need_reload = true;
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 45a96b9be..96d8210e3 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -2585,7 +2585,7 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev,
>      }
>  }
>  
> -static void
> +static int
>  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                           struct dp_packet **pkts, int cnt)
>  {
> @@ -2667,6 +2667,8 @@ out:
>      for (i = 0; i < n_packets_to_free; i++) {
>          dp_packet_delete(pkts[i]);
>      }
> +
> +    return 0;
>  }
>  
>  static void
> @@ -2774,7 +2776,7 @@ dpdk_copy_dp_packet_to_mbuf(struct rte_mempool *mp, struct dp_packet *pkt_orig)
>  }
>  
>  /* Tx function. Transmit packets indefinitely */
> -static void
> +static int
>  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>      OVS_NO_THREAD_SAFETY_ANALYSIS
>  {
> @@ -2793,6 +2795,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>      uint32_t tx_failure = 0;
>      uint32_t mtu_drops = 0;
>      uint32_t qos_drops = 0;
> +    int ret = 0;
>  
>      if (dev->type != DPDK_DEV_VHOST) {
>          /* Check if QoS has been configured for this netdev. */
> @@ -2826,7 +2829,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>  
>      if (OVS_LIKELY(txcnt)) {
>          if (dev->type == DPDK_DEV_VHOST) {
> -            __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
> +            ret = __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
>          } else {
>              tx_failure += netdev_dpdk_eth_tx_burst(dev, qid,
>                                                     (struct rte_mbuf **)pkts,
> @@ -2843,6 +2846,8 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>          sw_stats->tx_qos_drops += qos_drops;
>          rte_spinlock_unlock(&dev->stats_lock);
>      }
> +
> +    return ret;
>  }
>  
>  static int
> @@ -2851,14 +2856,15 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                         bool concurrent_txq OVS_UNUSED)
>  {
>  
> +    int ret = 0;
>      if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
> -        dpdk_do_tx_copy(netdev, qid, batch);
> +        ret = dpdk_do_tx_copy(netdev, qid, batch);
>          dp_packet_delete_batch(batch, true);
>      } else {
> -        __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
> -                                 dp_packet_batch_size(batch));
> +        ret = __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
> +                                       dp_packet_batch_size(batch));
>      }
> -    return 0;
> +    return ret;
>  }
>  
>  static inline void
> @@ -5468,6 +5474,7 @@ static const struct netdev_class dpdk_vhost_class = {
>      .construct = netdev_dpdk_vhost_construct,
>      .destruct = netdev_dpdk_vhost_destruct,
>      .send = netdev_dpdk_vhost_send,
> +    .process_async = NULL,
>      .get_carrier = netdev_dpdk_vhost_get_carrier,
>      .get_stats = netdev_dpdk_vhost_get_stats,
>      .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
> @@ -5484,6 +5491,7 @@ static const struct netdev_class dpdk_vhost_client_class = {
>      .destruct = netdev_dpdk_vhost_destruct,
>      .set_config = netdev_dpdk_vhost_client_set_config,
>      .send = netdev_dpdk_vhost_send,
> +    .process_async = NULL,
>      .get_carrier = netdev_dpdk_vhost_get_carrier,
>      .get_stats = netdev_dpdk_vhost_get_stats,
>      .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index b5420947d..a448328e7 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -384,7 +384,11 @@ struct netdev_class {
>       * if it would always return EOPNOTSUPP anyhow.  (This will prevent the
>       * network device from being usefully used by the netdev-based "userspace
>       * datapath".  It will also prevent the OVS implementation of bonding from
> -     * working properly over 'netdev'.) */
> +     * working properly over 'netdev'.)
> +     *
> +     * May return EINPROGRESS. This indicates that the netdev has more work to
> +     * do, and needs to have process_async called before sending buffers is
> +     * totally completed. */
>      int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
>                  bool concurrent_txq);
>  
> @@ -402,6 +406,19 @@ struct netdev_class {
>       * implement packet transmission through the 'send' member function. */
>      void (*send_wait)(struct netdev *netdev, int qid);
>  
> +    /* Performs asynchronous work required by the netdev to complete sending
> +     * buffers. The work done in the process_async function is netdev specific,
> +     * but could include freeing packets or updating port stats.
> +     *
> +     * If called with force = false, may return EINPROGRESS if the async call
> +     * still hasn't completed, indicating process_async should be called on
> +     * this netdev + qid again in the future.
> +     *
> +     * If called with force = true, can't return EINPROGRESS. Must handle stats
> +     * updates and any freeing of buffers even if they haven't been sent yet.
> +     */
> +    int (*process_async)(struct netdev *netdev, int qid, bool force);
> +

Are there ever plans to add support to other netdev types?  For example,
dpdk netdevs?  What about linux netdevs?  That should come along with
this, otherwise this framework is just adding support to vhost for
something that could be done internally to the vhost device, right?
Even I think it could use an existing framework (like the ovs_rcu) to do
deferred cleanup / functions?  As I wrote above, why doesn't vhost
library take care of using dma devices and pushing packets?

>      /* Sets 'netdev''s Ethernet address to 'mac' */
>      int (*set_etheraddr)(struct netdev *netdev, const struct eth_addr mac);
>  
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 8305f6c42..e122441cf 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -892,7 +892,8 @@ netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
>      }
>  
>      error = netdev->netdev_class->send(netdev, qid, batch, concurrent_txq);
> -    if (!error) {
> +    /* For async, treat netdev_send as called when -EINPROGRESS is returned. */
> +    if (!error || error == -EINPROGRESS) {
>          COVERAGE_INC(netdev_sent);
>      }
>      return error;
Ilya Maximets Sept. 15, 2021, 2:24 p.m. UTC | #2
On 9/10/21 20:36, Aaron Conole wrote:
> Hi Cian,
> 
> Apologies for the mix of code, design comments.  I have lots of
> intermingled issues here (but I also didn't spend too detailed look at
> the implementation).
> 
> Cian Ferriter <cian.ferriter@intel.com> writes:
> 
>> These work rings help with handling the asynchronous TX usecase. In this
>> usecase, netdev_send will be called, but packets won't be immediately
>> sent by the thread calling netdev_send, but instead handled by a
>> different resource. Since the TX is not instantaneous, the thread
>> calling netdev_send can't immediately free the packets being sent, or
>> report them as sent. Rather than the thread polling for completion of
>> the TX, it is desirable that the thread to move on and process more
>> packets.
>>
>> The work ring serves as a FIFO queue to keep track of the asynchronous
>> TX calls that have been kicked off. The work ring is added/queued to when
>> netdev_send returns '-EINPROGRESS' indicating it kicked off an
>> asynchronous TX. The work ring is taken/dequeued from in 2 main cases:
>> 1. In pmd_thread_main after processing every rxq assigned to the thread.
>> 2. When the ring is full while trying to queue work.
> 
> I'm not sold on the overall idea - why can't the vhost library perform
> the DMA internally?  Other devices (netdev-linux, f.e.) manage DMA
> transfers external to OVS.  It seems like you're making a change here
> that will impact all of PMD design for vhost only.

<snip>
 
> Are there ever plans to add support to other netdev types?  For example,
> dpdk netdevs?  What about linux netdevs?  That should come along with
> this, otherwise this framework is just adding support to vhost for
> something that could be done internally to the vhost device, right?
> Even I think it could use an existing framework (like the ovs_rcu) to do
> deferred cleanup / functions?  As I wrote above, why doesn't vhost
> library take care of using dma devices and pushing packets?

Thanks Aaron for sharing your thoughts on the design of this feature.
I didn't look closely to technical comments, but I completely agree with
statements and questions above.

All network devices are asynchronous by their nature and DMA is handled
by their drivers.  vhost is exception from this rule, so I don't see a
point in implementing all this machinery to bring handling of a special
case to a high application level.  DMA should be implemented inside the
vhost library and be completely hidden from the application just like
DMA handled for any other physical device driver.  Memory copies on a
device level is not something that OVS should care about.

Best regards, Ilya Maximets.
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 46f869a33..0d910bc92 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -115,6 +115,7 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/dpif-netdev-lookup-generic.c \
 	lib/dpif-netdev.c \
 	lib/dpif-netdev.h \
+	lib/dpif-netdev-private-defer.h \
 	lib/dpif-netdev-private-dfc.c \
 	lib/dpif-netdev-private-dfc.h \
 	lib/dpif-netdev-private-dpcls.h \
diff --git a/lib/dpif-netdev-perf.c b/lib/dpif-netdev-perf.c
index d7676ea2b..859ef300c 100644
--- a/lib/dpif-netdev-perf.c
+++ b/lib/dpif-netdev-perf.c
@@ -230,6 +230,7 @@  pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
     uint64_t tot_iter = histogram_samples(&s->pkts);
     uint64_t idle_iter = s->pkts.bin[0];
     uint64_t busy_iter = tot_iter >= idle_iter ? tot_iter - idle_iter : 0;
+    uint64_t work_deferred = stats[PMD_STAT_WORK_DEFER];
 
     ds_put_format(str,
             "  Iterations:        %12"PRIu64"  (%.2f us/it)\n"
@@ -284,7 +285,24 @@  pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s,
             tx_packets, (tx_packets / duration) / 1000,
             tx_batches, 1.0 * tx_packets / tx_batches);
     } else {
-        ds_put_format(str, "  Tx packets:        %12d\n\n", 0);
+        ds_put_format(str, "  Tx packets:        %12d\n", 0);
+    }
+    if (work_deferred > 0) {
+        uint64_t work_compl_checks = stats[PMD_STAT_WORK_IN_PROG] +
+                                     stats[PMD_STAT_WORK_DONE];
+
+        ds_put_format(str,
+            "  Work deferred:                   %12"PRIu64"\n"
+            "  - Deferred work done:            %12"PRIu64"\n"
+            "  - Work completion checks:        %12"PRIu64
+                                                "  (%.2f checks/work item)\n"
+            "  - Ring full when deferring work: %12"PRIu64"\n"
+            "  - Deferred work dropped:         %12"PRIu64"\n",
+            work_deferred, stats[PMD_STAT_WORK_DONE], work_compl_checks,
+            1.0 * work_compl_checks / stats[PMD_STAT_WORK_DONE],
+            stats[PMD_STAT_WORK_R_FULL], stats[PMD_STAT_WORK_DROPPED]);
+    } else {
+        ds_put_format(str, "  Work deferred:     %12d\n\n", 0);
     }
 }
 
diff --git a/lib/dpif-netdev-perf.h b/lib/dpif-netdev-perf.h
index 834c26260..e9c02a866 100644
--- a/lib/dpif-netdev-perf.h
+++ b/lib/dpif-netdev-perf.h
@@ -76,6 +76,15 @@  enum pmd_stat_type {
                              * recirculation. */
     PMD_STAT_SENT_PKTS,     /* Packets that have been sent. */
     PMD_STAT_SENT_BATCHES,  /* Number of batches sent. */
+    PMD_STAT_WORK_DEFER,    /* Number of times that work was deferred. */
+    PMD_STAT_WORK_IN_PROG,  /* Number of times that work was still in progress
+                               when checked by a thread. */
+    PMD_STAT_WORK_R_FULL,   /* Number of times work ring was full when
+                             * deferring work. */
+    PMD_STAT_WORK_DONE,     /* Number of times that deferred work was
+                             * completed. */
+    PMD_STAT_WORK_DROPPED,  /* Number of times that deferred work was dropped.
+                             */
     PMD_CYCLES_ITER_IDLE,   /* Cycles spent in idle iterations. */
     PMD_CYCLES_ITER_BUSY,   /* Cycles spent in busy iterations. */
     PMD_CYCLES_UPCALL,      /* Cycles spent processing upcalls. */
diff --git a/lib/dpif-netdev-private-defer.h b/lib/dpif-netdev-private-defer.h
new file mode 100644
index 000000000..78c140f56
--- /dev/null
+++ b/lib/dpif-netdev-private-defer.h
@@ -0,0 +1,84 @@ 
+/*
+ * Copyright (c) 2021 Intel Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DPIF_NETDEV_PRIVATE_DEFER_H
+#define DPIF_NETDEV_PRIVATE_DEFER_H 1
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "dpif.h"
+#include "dpif-netdev-perf.h"
+#include "cmap.h"
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+/* Function definition for deferred work. */
+typedef int (*dp_defer_work_func)(struct netdev *netdev, int qid, bool force);
+
+/* Structure to track outstanding work to be done. */
+struct dp_defer_work_item {
+    dp_defer_work_func work_func;
+    void *netdev;
+    int qid;
+    uint32_t attempts;
+};
+
+#define WORK_RING_SIZE 128
+#define WORK_RING_MASK (WORK_RING_SIZE - 1)
+
+#define ATTEMPT_LIMIT 1000
+
+/* The read and write indexes are between 0 and 2^32, and we mask their value
+ * when we access the work_ring[] array. */
+struct dp_defer {
+    uint32_t read_idx;
+    uint32_t write_idx;
+    struct dp_defer_work_item work_ring[WORK_RING_SIZE];
+};
+
+static inline void
+dp_defer_init(struct dp_defer *defer)
+{
+    defer->read_idx = 0;
+    defer->write_idx = 0;
+}
+
+static inline int
+dp_defer_work_ring_empty(const struct dp_defer *defer)
+{
+    return defer->write_idx == defer->read_idx;
+}
+
+static inline int
+dp_defer_work_ring_full(const struct dp_defer *defer)
+{
+    /* When the write index is exactly (WORK_RING_SIZE - 1) or WORK_RING_MASK
+     * elements ahead of the read index, the ring is full. When calculating the
+     * difference between the indexes, wraparound is not an issue since
+     * unsigned ints are used. */
+    uint16_t count = (defer->write_idx - defer->read_idx) & WORK_RING_MASK;
+
+    return count == WORK_RING_MASK;
+}
+
+#ifdef  __cplusplus
+}
+#endif
+
+#endif /* dpif-netdev-private-defer.h */
diff --git a/lib/dpif-netdev-private-thread.h b/lib/dpif-netdev-private-thread.h
index a782d9678..d14a5ade7 100644
--- a/lib/dpif-netdev-private-thread.h
+++ b/lib/dpif-netdev-private-thread.h
@@ -20,6 +20,7 @@ 
 
 #include "dpif.h"
 #include "dpif-netdev-perf.h"
+#include "dpif-netdev-private-defer.h"
 #include "dpif-netdev-private-dfc.h"
 #include "dpif-netdev-private-dpif.h"
 
@@ -219,6 +220,9 @@  struct dp_netdev_pmd_thread {
 
     /* Next time when PMD should try RCU quiescing. */
     long long next_rcu_quiesce;
+
+    /* Structure to track deferred work in this thread. */
+    struct dp_defer defer;
 };
 
 #ifdef  __cplusplus
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index b3e57bb95..f4143a93a 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -435,6 +435,7 @@  struct tx_port {
     long long flush_time;
     struct dp_packet_batch output_pkts;
     struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
+    dp_defer_work_func cached_work_func;
 };
 
 /* Contained by struct tx_bond 'member_buckets'. */
@@ -4591,6 +4592,97 @@  pmd_perf_metrics_enabled(const struct dp_netdev_pmd_thread *pmd OVS_UNUSED)
 }
 #endif
 
+/* Try to do one piece of work from the work ring.
+ *
+ * Returns:
+ * -ENOENT: No work to do. The ring is empty.
+ * -EINPROGRESS: The work is still in progress, I can't do it.
+ * 0: One piece of work was taken from the ring. It was either successfully
+ *    handled, or dropped if attempted too many times.
+ */
+static inline unsigned int
+dp_defer_do_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats)
+{
+    struct dp_defer_work_item *work;
+    uint32_t read_idx;
+    int ret;
+
+    /* Check that there's a piece of work in the ring to do. */
+    if (dp_defer_work_ring_empty(defer)) {
+        return -ENOENT;
+    }
+
+    read_idx = defer->read_idx & WORK_RING_MASK;
+    work = &defer->work_ring[read_idx];
+    ret = work->work_func(work->netdev, work->qid, false);
+
+    if (ret == -EINPROGRESS) {
+        pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_IN_PROG, 1);
+
+        work->attempts++;
+        if (work->attempts > ATTEMPT_LIMIT) {
+            ret = work->work_func(work->netdev, work->qid, true);
+            defer->read_idx++;
+
+            if (ret) {
+                pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DROPPED, 1);
+            } else {
+                pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1);
+            }
+
+            return 0;
+        }
+
+        return ret;
+    }
+
+    defer->read_idx++;
+
+    pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1);
+
+    return 0;
+}
+
+static inline void
+dp_defer_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats,
+              struct dp_defer_work_item *work)
+{
+    struct dp_defer_work_item *ring_item;
+    uint32_t write_idx;
+
+    /* Check that we have enough room in ring. */
+    if (dp_defer_work_ring_full(defer)) {
+        /* The work ring is full, try to make room by doing work. Doing work
+         * can fail to make room if the work has to be requeued. Keep trying to
+         * do work until there is room in the ring. */
+        pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_R_FULL, 1);
+
+        while (dp_defer_do_work(defer, perf_stats)) {
+            continue;
+        }
+    }
+
+    write_idx = defer->write_idx & WORK_RING_MASK;
+    ring_item = &defer->work_ring[write_idx];
+
+    ring_item->work_func = work->work_func;
+    ring_item->netdev = work->netdev;
+    ring_item->qid = work->qid;
+    ring_item->attempts = 0;
+
+    defer->write_idx++;
+
+    pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DEFER, 1);
+}
+
+static inline void
+dp_defer_do_all_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats)
+{
+    while (dp_defer_do_work(defer, perf_stats) != -ENOENT) {
+        continue;
+    }
+}
+
 static int
 dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                    struct tx_port *p)
@@ -4600,10 +4692,12 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     int output_cnt;
     bool dynamic_txqs;
     struct cycle_timer timer;
+    struct netdev *netdev;
     uint64_t cycles;
     uint32_t tx_flush_interval;
+    struct pmd_perf_stats *perf_stats = &pmd->perf_stats;
 
-    cycle_timer_start(&pmd->perf_stats, &timer);
+    cycle_timer_start(perf_stats, &timer);
 
     dynamic_txqs = p->port->dynamic_txqs;
     if (dynamic_txqs) {
@@ -4615,7 +4709,21 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     output_cnt = dp_packet_batch_size(&p->output_pkts);
     ovs_assert(output_cnt > 0);
 
-    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
+    netdev = p->port->netdev;
+    int ret = netdev_send(netdev, tx_qid, &p->output_pkts,
+                          dynamic_txqs);
+
+    if (ret == -EINPROGRESS) {
+        struct dp_defer_work_item work = {
+            .work_func = p->cached_work_func,
+            .netdev = netdev,
+            .qid = tx_qid,
+        };
+
+        /* Defer the work. */
+        dp_defer_work(&pmd->defer, perf_stats, &work);
+    }
+
     dp_packet_batch_init(&p->output_pkts);
 
     /* Update time of the next flush. */
@@ -4625,12 +4733,15 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
     ovs_assert(pmd->n_output_batches > 0);
     pmd->n_output_batches--;
 
-    pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
-    pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1);
+    /* The batch and number of packets are updated as sent here, even though
+     * some packets might have been dropped, or are in transit asynchronously.
+     */
+    pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
+    pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_BATCHES, 1);
 
     /* Distribute send cycles evenly among transmitted packets and assign to
      * their respective rx queues. */
-    cycles = cycle_timer_stop(&pmd->perf_stats, &timer) / output_cnt;
+    cycles = cycle_timer_stop(perf_stats, &timer) / output_cnt;
     for (i = 0; i < output_cnt; i++) {
         if (p->output_pkts_rxqs[i]) {
             dp_netdev_rxq_add_cycles(p->output_pkts_rxqs[i],
@@ -6196,6 +6307,7 @@  reload:
     ovs_mutex_lock(&pmd->perf_stats.stats_mutex);
     for (;;) {
         uint64_t rx_packets = 0, tx_packets = 0;
+        struct dp_defer *defer = &pmd->defer;
 
         pmd_perf_start_iteration(s);
 
@@ -6228,10 +6340,20 @@  reload:
             tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
         }
 
+        /* Try to clear the work ring. If a piece of work is still in progress,
+         * don't attempt to do the remaining work items. They will be postponed
+         * to the next interation of pmd_thread_main(). */
+        while (!dp_defer_do_work(defer, s)) {
+            continue;
+        }
+
         /* Do RCU synchronization at fixed interval.  This ensures that
          * synchronization would not be delayed long even at high load of
          * packet processing. */
         if (pmd->ctx.now > pmd->next_rcu_quiesce) {
+            /* Do any work outstanding on this PMD thread. */
+            dp_defer_do_all_work(defer, s);
+
             if (!ovsrcu_try_quiesce()) {
                 pmd->next_rcu_quiesce =
                     pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
@@ -6240,6 +6362,8 @@  reload:
 
         if (lc++ > 1024) {
             lc = 0;
+            /* Do any work outstanding on this PMD thread. */
+            dp_defer_do_all_work(defer, s);
 
             coverage_try_clear();
             dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
@@ -6262,6 +6386,8 @@  reload:
 
         atomic_read_explicit(&pmd->reload, &reload, memory_order_acquire);
         if (OVS_UNLIKELY(reload)) {
+            /* Do any work outstanding on this PMD thread. */
+            dp_defer_do_all_work(defer, s);
             break;
         }
 
@@ -6748,6 +6874,8 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd_perf_stats_init(&pmd->perf_stats);
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
                 hash_int(core_id, 0));
+
+    dp_defer_init(&pmd->defer);
 }
 
 static void
@@ -6918,6 +7046,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     tx->qid = -1;
     tx->flush_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
+    tx->cached_work_func = port->netdev->netdev_class->process_async;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 45a96b9be..96d8210e3 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -2585,7 +2585,7 @@  netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev,
     }
 }
 
-static void
+static int
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
 {
@@ -2667,6 +2667,8 @@  out:
     for (i = 0; i < n_packets_to_free; i++) {
         dp_packet_delete(pkts[i]);
     }
+
+    return 0;
 }
 
 static void
@@ -2774,7 +2776,7 @@  dpdk_copy_dp_packet_to_mbuf(struct rte_mempool *mp, struct dp_packet *pkt_orig)
 }
 
 /* Tx function. Transmit packets indefinitely */
-static void
+static int
 dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
@@ -2793,6 +2795,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     uint32_t tx_failure = 0;
     uint32_t mtu_drops = 0;
     uint32_t qos_drops = 0;
+    int ret = 0;
 
     if (dev->type != DPDK_DEV_VHOST) {
         /* Check if QoS has been configured for this netdev. */
@@ -2826,7 +2829,7 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
 
     if (OVS_LIKELY(txcnt)) {
         if (dev->type == DPDK_DEV_VHOST) {
-            __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
+            ret = __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt);
         } else {
             tx_failure += netdev_dpdk_eth_tx_burst(dev, qid,
                                                    (struct rte_mbuf **)pkts,
@@ -2843,6 +2846,8 @@  dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
         sw_stats->tx_qos_drops += qos_drops;
         rte_spinlock_unlock(&dev->stats_lock);
     }
+
+    return ret;
 }
 
 static int
@@ -2851,14 +2856,15 @@  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                        bool concurrent_txq OVS_UNUSED)
 {
 
+    int ret = 0;
     if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
-        dpdk_do_tx_copy(netdev, qid, batch);
+        ret = dpdk_do_tx_copy(netdev, qid, batch);
         dp_packet_delete_batch(batch, true);
     } else {
-        __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
-                                 dp_packet_batch_size(batch));
+        ret = __netdev_dpdk_vhost_send(netdev, qid, batch->packets,
+                                       dp_packet_batch_size(batch));
     }
-    return 0;
+    return ret;
 }
 
 static inline void
@@ -5468,6 +5474,7 @@  static const struct netdev_class dpdk_vhost_class = {
     .construct = netdev_dpdk_vhost_construct,
     .destruct = netdev_dpdk_vhost_destruct,
     .send = netdev_dpdk_vhost_send,
+    .process_async = NULL,
     .get_carrier = netdev_dpdk_vhost_get_carrier,
     .get_stats = netdev_dpdk_vhost_get_stats,
     .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
@@ -5484,6 +5491,7 @@  static const struct netdev_class dpdk_vhost_client_class = {
     .destruct = netdev_dpdk_vhost_destruct,
     .set_config = netdev_dpdk_vhost_client_set_config,
     .send = netdev_dpdk_vhost_send,
+    .process_async = NULL,
     .get_carrier = netdev_dpdk_vhost_get_carrier,
     .get_stats = netdev_dpdk_vhost_get_stats,
     .get_custom_stats = netdev_dpdk_get_sw_custom_stats,
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index b5420947d..a448328e7 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -384,7 +384,11 @@  struct netdev_class {
      * if it would always return EOPNOTSUPP anyhow.  (This will prevent the
      * network device from being usefully used by the netdev-based "userspace
      * datapath".  It will also prevent the OVS implementation of bonding from
-     * working properly over 'netdev'.) */
+     * working properly over 'netdev'.)
+     *
+     * May return EINPROGRESS. This indicates that the netdev has more work to
+     * do, and needs to have process_async called before sending buffers is
+     * totally completed. */
     int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
                 bool concurrent_txq);
 
@@ -402,6 +406,19 @@  struct netdev_class {
      * implement packet transmission through the 'send' member function. */
     void (*send_wait)(struct netdev *netdev, int qid);
 
+    /* Performs asynchronous work required by the netdev to complete sending
+     * buffers. The work done in the process_async function is netdev specific,
+     * but could include freeing packets or updating port stats.
+     *
+     * If called with force = false, may return EINPROGRESS if the async call
+     * still hasn't completed, indicating process_async should be called on
+     * this netdev + qid again in the future.
+     *
+     * If called with force = true, can't return EINPROGRESS. Must handle stats
+     * updates and any freeing of buffers even if they haven't been sent yet.
+     */
+    int (*process_async)(struct netdev *netdev, int qid, bool force);
+
     /* Sets 'netdev''s Ethernet address to 'mac' */
     int (*set_etheraddr)(struct netdev *netdev, const struct eth_addr mac);
 
diff --git a/lib/netdev.c b/lib/netdev.c
index 8305f6c42..e122441cf 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -892,7 +892,8 @@  netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
     }
 
     error = netdev->netdev_class->send(netdev, qid, batch, concurrent_txq);
-    if (!error) {
+    /* For async, treat netdev_send as called when -EINPROGRESS is returned. */
+    if (!error || error == -EINPROGRESS) {
         COVERAGE_INC(netdev_sent);
     }
     return error;