diff mbox series

[ovs-dev,37/41] dpif-offload-dpdk: Abstract rte_flow implementation from dpif-netdev.

Message ID 2a063b239d7295d23f89a306406e720c375c297d.1762950453.git.echaudro@redhat.com
State Changes Requested
Headers show
Series Architectural refactoring of hardware offload infrastructure | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Eelco Chaudron Nov. 12, 2025, 3:35 p.m. UTC
The generic dpif-netdev datapath was tightly coupled with the
rte_flow offload implementation. It contained logic for managing
offload threads, queues, and flow mark associations that was
specific to rte_flow.

This commit decouples the two by moving all rte_flow-specific
logic into the dpif-offload-dpdk.c provider. The dpif-offload
API is extended to support asynchronous flow operations (put/del)
with callbacks, making dpif-netdev a generic client of the
offload provider.

This abstraction results in a cleaner separation of concerns,
simplifying the dpif-netdev code and making the offload
framework more modular and maintainable.

Signed-off-by: Eelco Chaudron <echaudro@redhat.com>
---
 lib/automake.mk                 |    1 +
 lib/dpif-netdev-private-flow.h  |   10 +-
 lib/dpif-netdev.c               | 1193 +++++++------------------------
 lib/dpif-netdev.h               |   12 -
 lib/dpif-offload-dpdk-private.h |   39 +
 lib/dpif-offload-dpdk.c         |  887 ++++++++++++++++++++++-
 lib/dpif-offload-provider.h     |   30 +-
 lib/dpif-offload-tc.c           |    8 -
 lib/dpif-offload.c              |  124 +---
 lib/dpif-offload.h              |   42 +-
 lib/netdev-offload-dpdk.c       |  545 +++++++++++---
 lib/netdev-offload-dpdk.h       |   65 +-
 12 files changed, 1709 insertions(+), 1247 deletions(-)
 create mode 100644 lib/dpif-offload-dpdk-private.h

Comments

0-day Robot Nov. 14, 2025, 2:17 p.m. UTC | #1
git sha: 2a063b239d7295d23f89a306406e720c375c297d
Author: Eelco Chaudron <echaudro@redhat.com>
Subject: dpif-offload-dpdk: Abstract rte_flow implementation from dpif-netdev.

This commit abstracts the rte_flow-specific logic from dpif-netdev into 
the dpif-offload-dpdk provider, making the generic datapath implementation 
independent of specific offload technologies.

> diff --git a/lib/automake.mk b/lib/automake.mk
> index f7586bb0a..c0edef8e9 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -508,6 +508,7 @@ if DPDK_NETDEV
>  lib_libopenvswitch_la_SOURCES += \
>  	lib/dpdk.c \
>  	lib/dpif-offload-dpdk.c \
> +	lib/dpif-offload-dpdk-private.h \
>  	lib/netdev-dpdk.c \
>  	lib/netdev-offload-dpdk.c \
>  	lib/netdev-offload-dpdk.h
> diff --git a/lib/dpif-netdev-private-flow.h b/lib/dpif-netdev-private-flow.h
> index 7425dd44e..753358925 100644
> --- a/lib/dpif-netdev-private-flow.h
> +++ b/lib/dpif-netdev-private-flow.h
> @@ -89,7 +89,7 @@ struct dp_netdev_flow {
>                                   /* 'flow_table'. */
>      const struct cmap_node simple_match_node; /* In dp_netdev_pmd_thread's
>                                                   'simple_match_table'. */
> -    const struct cmap_node mark_node; /* In owning flow_mark's mark_to_flow */
> +    const struct cmap_node mufid_node; /* In dp_netdev's 'mufid_to_flow'. */
>      const ovs_u128 ufid;         /* Unique flow identifier. */
>      const ovs_u128 mega_ufid;    /* Unique mega flow identifier. */
>      const unsigned pmd_id;       /* The 'core_id' of pmd thread owning this */

Does this field change from mark_node to mufid_node actually get used 
anywhere in the code? I don't see any references to mufid_node being 
added or any mufid_to_flow map being created.

> @@ -102,18 +102,14 @@ struct dp_netdev_flow {
>      struct ovs_refcount ref_cnt;
>  
>      bool dead;
> -    uint32_t mark;               /* Unique flow mark for netdev offloading. */
> +    bool offloaded;
> +    atomic_int offload_queue_depth;
>      uint64_t simple_match_mark;  /* Unique flow mark for the simple match. */
>      odp_port_t orig_in_port;
>  
>      /* Statistics. */
>      struct dp_netdev_flow_stats stats;
>  
> -    /* Statistics and attributes received from the netdev offload provider. */
> -    atomic_int netdev_flow_get_result;
> -    struct dp_netdev_flow_stats last_stats;
> -    struct dp_netdev_flow_attrs last_attrs;
> -
>      /* Actions. */
>      OVSRCU_TYPE(struct dp_netdev_actions *) actions;
>  
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c

[ ... ]

> @@ -2463,137 +2463,171 @@ log_netdev_flow_change(const struct dp_netdev_flow *flow,
>      ds_destroy(&ds);
>  }
>  
> -static void
> -queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
> -                      struct dp_netdev_flow *flow, struct match *match,
> -                      const struct nlattr *actions, size_t actions_len,
> -                      int op)
> +/* Offloaded flows can be handled asynchronously, so we do not always know
> + * whether a specific flow is offloaded or not.  It might still be pending;
> + * in fact, multiple modifications can be pending, and the actual offload
> + * state depends on the completion of each modification.
> + *
> + * To correctly determine whether a flow is offloaded when it is being
> + * destroyed (and therefore requires cleanup), we must ensure that all
> + * operations have completed.  To achieve this, we track the number of
> + * outstanding offloaded flow modifications. */
> +static bool
> +offload_queue_inc(struct dp_netdev_flow *flow)
>  {
> -    struct dp_offload_thread_item *item;
> -    struct dp_offload_flow_item *flow_offload;
> +    int current;
>  
> -    if (!dpif_offload_is_offload_enabled()) {
> -        return;
> +    while (true) {
> +        atomic_read(&flow->offload_queue_depth, &current);
> +        if (current < 0) {
> +            /* We are cleaning up, so no longer enqueue operations. */
> +            return false;
> +        }
> +
> +        /* Here we try to atomically increase the value.  If we do not succeed,
> +         * someone else has modified it, and we need to check again for a
> +         * current negative value. */
> +        if (atomic_compare_exchange_strong(&flow->offload_queue_depth,
> +                                           &current, current + 1)) {
> +            return true;
> +        }
>      }

This infinite loop in offload_queue_inc() could potentially be problematic 
under high contention. Does this code handle the case where multiple threads 
are continuously modifying offload_queue_depth?

> +}
> +
> +static bool
> +offload_queue_dec(struct dp_netdev_flow *flow)
> +{
> +    int old;
> +
> +    atomic_sub(&flow->offload_queue_depth, 1, &old);
> +    ovs_assert(old >= 1);
> +
> +    if (old == 1) {
> +        /* Note that this only indicates that the queue might be empty. */
> +        return true;
> +    }
> +    return false;
> +}
> +
> +static bool
> +offload_queue_complete(struct dp_netdev_flow *flow)
> +{
> +    /* This function returns false if the queue is still in use.
> +     * If the queue is empty, it will attempt to atomically mark it as
> +     * 'not in use' by making the queue depth negative.  This prevents
> +     * other flow operations from being added.  If successful, it returns
> +     * true. */
> +     int expected_val = 0;
> +
> +    return atomic_compare_exchange_strong(&flow->offload_queue_depth,
> +                                          &expected_val, -1);
> +}

Is there a potential race condition between offload_queue_dec() returning 
true and offload_queue_complete() being called where another thread could 
increment the queue depth?

> +static void
> +offload_flow_reference_unreference_cb(unsigned pmd_id OVS_UNUSED,
> +                                      void *flow_reference_)
> +{
> +    struct dp_netdev_flow *flow_reference = flow_reference_;
> +
> +    if (flow_reference) {
> +        flow_reference->offloaded = false;
> +        dp_netdev_flow_unref(flow_reference);
> +    }
> +}

[ ... ]

> @@ -4185,12 +3596,10 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
>      /* Do not allocate extra space. */
>      flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
>      memset(&flow->stats, 0, sizeof flow->stats);
> -    atomic_init(&flow->netdev_flow_get_result, 0);
> -    memset(&flow->last_stats, 0, sizeof flow->last_stats);
> -    memset(&flow->last_attrs, 0, sizeof flow->last_attrs);
>      flow->dead = false;
> +    flow->offloaded = false;
> +    atomic_init(&flow->offload_queue_depth, 0);
>      flow->batch = NULL;
> -    flow->mark = INVALID_FLOW_MARK;
>      flow->orig_in_port = orig_in_port;
>      *CONST_CAST(unsigned *, &flow->pmd_id) = pmd->core_id;
>      *CONST_CAST(struct flow *, &flow->flow) = match->flow;

[ ... ]

> diff --git a/lib/dpif-offload-dpdk.c b/lib/dpif-offload-dpdk.c
> index 5c3e7f041..547bb68ef 100644
> --- a/lib/dpif-offload-dpdk.c
> +++ b/lib/dpif-offload-dpdk.c

[ ... ]

> +uint32_t
> +dpif_offload_dpdk_allocate_flow_mark(struct dpif_offload_dpdk *offload)
> +{
> +    static struct ovsthread_once init_once = OVSTHREAD_ONCE_INITIALIZER;
> +    unsigned int uid = dpdk_offload_thread_id() \
> +                       % offload->offload_thread_count;
> +    uint32_t flow_mark;
> +
> +    if (ovsthread_once_start(&init_once)) {
> +        /* Haven't initiated yet, do it here. */
> +        offload->flow_mark_pool = id_fpool_create(
> +            offload->offload_thread_count, 1, UINT32_MAX - 1);
> +        ovsthread_once_done(&init_once);
> +    }

Does this static ovsthread_once cause issues when multiple dpif_offload_dpdk 
instances exist? The flow mark pool initialization seems to only happen once 
globally but each offload instance might need its own pool.

> +
> +    if (id_fpool_new_id(offload->flow_mark_pool, uid, &flow_mark)) {
> +        return flow_mark;
> +    }
> +
> +    return INVALID_FLOW_MARK;
> +}

[ ... ]

> +static void
> +dpif_offload_dpdk_offload_threads_init(struct dpif_offload_dpdk *offload)
> +{
> +    offload->offload_threads = xcalloc(offload->offload_thread_count,
> +                                       sizeof(struct dpdk_offload_thread));
> +
> +    for (unsigned int tid = 0; tid < offload->offload_thread_count; tid++) {
> +        struct dpdk_offload_thread *thread;
> +
> +        thread = &offload->offload_threads[tid];
> +        mpsc_queue_init(&thread->queue);
> +        atomic_init(&thread->enqueued_item, 0);
> +        mov_avg_cma_init(&thread->cma);
> +        mov_avg_ema_init(&thread->ema, 100);
> +        thread->offload = offload;
> +        thread->thread = ovs_thread_create(
> +            "dpdk_offload", dpif_offload_dpdk_offload_thread_main, thread);
> +    }
> +}

Are there any checks to ensure offload_thread_count is reasonable? Could 
this code handle the case where offload_thread_count is extremely large 
and cause resource exhaustion?

[ ... ]

> diff --git a/lib/netdev-offload-dpdk.c b/lib/netdev-offload-dpdk.c

[ ... ]

> +static struct pmd_data *
> +netdev_offload_dpdk_pmd_data_init(void)
> +{
> +    struct pmd_data *mapping = xmalloc(sizeof *mapping);
> +
> +    ovs_mutex_init(&mapping->map_lock);
> +    cmap_init(&mapping->pmd_id_to_flow_ref);
> +    return mapping;
> +}

Does this function handle the case where xmalloc() fails? The code doesn't 
seem to check for allocation failure.

> +static void
> +netdev_offload_dpdk_pmd_data_cleanup_mappings(
> +    struct dpif_offload_dpdk *offload, struct pmd_data *mapping)
> +{
> +    struct pmd_id_to_flow_ref_data *data;
> +
> +    if (!mapping) {
> +        return;
> +    }
> +
> +    ovs_mutex_lock(&mapping->map_lock);
> +
> +    CMAP_FOR_EACH (data, node, &mapping->pmd_id_to_flow_ref) {
> +        cmap_remove(&mapping->pmd_id_to_flow_ref, &data->node,
> +                    hash_int(data->pmd_id, 0));
> +
> +        dpif_offload_dpdk_flow_unreference(offload, data->pmd_id,
> +                                           data->flow_reference);
> +        ovsrcu_postpone(free, data);
> +    }
> +
> +    ovs_mutex_unlock(&mapping->map_lock);
> +}

Could this cleanup function cause issues if dpif_offload_dpdk_flow_unreference() 
callbacks try to access the mapping being cleaned up?

[ ... ]

> @@ -2506,13 +2554,15 @@ out:
>  }
>  
>  static struct ufid_to_rte_flow_data *
> -netdev_offload_dpdk_add_flow(struct netdev *netdev,
> +netdev_offload_dpdk_add_flow(struct dpif_offload_dpdk *offload,
> +                             struct pmd_data *pmd_mapping,
> +                             struct netdev *netdev,
>                               struct match *match,
>                               struct nlattr *nl_actions,
>                               size_t actions_len,
>                               const ovs_u128 *ufid,
> -                             struct dpif_netdev_offload_info *info)
> +                             uint32_t flow_mark,
> +                             odp_port_t orig_in_port)
>  {
>      struct flow_patterns patterns = {
>          .items = NULL,
> @@ -2523,20 +2573,20 @@ netdev_offload_dpdk_add_flow(struct netdev *netdev,
>      bool actions_offloaded = true;
>      struct rte_flow *flow;
>  
> -    if (parse_flow_match(netdev, info->orig_in_port, &patterns, match)) {
> +    if (parse_flow_match(offload, netdev, orig_in_port, &patterns, match)) {
>          VLOG_DBG_RL(&rl, "%s: matches of ufid "UUID_FMT" are not supported",
>                      netdev_get_name(netdev), UUID_ARGS((struct uuid *) ufid));
>          goto out;
>      }
>  
> -    flow = netdev_offload_dpdk_actions(patterns.physdev, &patterns, nl_actions,
> -                                       actions_len);
> +    flow = netdev_offload_dpdk_actions(offload, patterns.physdev, &patterns,
> +                                       nl_actions, actions_len);
>      if (!flow && !netdev_vport_is_vport_class(netdev->netdev_class)) {
>          /* If we failed to offload the rule actions fallback to MARK+RSS
>           * actions.
>           */
>          flow = netdev_offload_dpdk_mark_rss(&patterns, netdev,
> -                                            info->flow_mark);
> +                                            flow_mark);
>          actions_offloaded = false;
>      }
>  
> @@ -2544,7 +2594,8 @@ netdev_offload_dpdk_add_flow(struct netdev *netdev,
>          goto out;
>      }
>      flows_data = ufid_to_rte_flow_associate(ufid, netdev, patterns.physdev,
> -                                            flow, actions_offloaded);
> +                                            flow, actions_offloaded,
> +                                            flow_mark, pmd_mapping);
>      VLOG_DBG("%s/%s: installed flow %p by ufid "UUID_FMT,
>               netdev_get_name(netdev), netdev_get_name(patterns.physdev), flow,
>               UUID_ARGS((struct uuid *) ufid));

[ ... ]

> @@ -2734,21 +2784,54 @@ netdev_offload_dpdk_flow_put(struct dpif_offload_dpdk *offload,
>          dpif_netdev_offload_ports_traverse(get_netdev_odp_cb, &aux);
>          orig_in_port = aux.odp_port;
>          old_stats = rte_flow_data->stats;
>          modification = true;
> -        ret = netdev_offload_dpdk_flow_destroy(rte_flow_data);
> +        pmd_mapping = ovsrcu_get(struct pmd_data *,
> +                                 &rte_flow_data->pmd_mapping);
> +        ovsrcu_set(&rte_flow_data->pmd_mapping, NULL);
> +        flow_mark = rte_flow_data->flow_mark;
> +
> +        ret = netdev_offload_dpdk_flow_destroy(offload, rte_flow_data,
> +                                               false, true);
>          if (ret < 0) {
>              return ret;
>          }
> +    } else if (!rte_flow_data) {
> +        pmd_mapping = netdev_offload_dpdk_pmd_data_init();
> +        netdev_offload_dpdk_pmd_data_associate(pmd_mapping, pmd_id,
> +                                               flow_reference);
> +        *previous_flow_reference = NULL;
> +        flow_mark = dpif_offload_dpdk_allocate_flow_mark(offload);
> +    } else /* if (rte_flow_data) */ {
> +        pmd_mapping = ovsrcu_get(struct pmd_data *,
> +                                 &rte_flow_data->pmd_mapping);
> +
> +        netdev_offload_dpdk_pmd_data_associate(pmd_mapping, pmd_id,
> +                                               flow_reference);
> +        *previous_flow_reference = NULL;
> +    }

In the middle else clause, flow_mark is allocated but what happens if 
the subsequent netdev_offload_dpdk_add_flow() fails? Does the allocated 
flow_mark get properly freed?

> +
> +    if (modification || !rte_flow_data) {
> +        rte_flow_data = netdev_offload_dpdk_add_flow(offload, pmd_mapping,
> +                                                     netdev, match,
> +                                                     actions, actions_len,
> +                                                     ufid, flow_mark,
> +                                                     orig_in_port);
> +        if (!rte_flow_data) {
> +            /* Clean up existing mappings, except for the current pmd_id one,
> +             * as this is handled through the callback. */
> +            netdev_offload_dpdk_pmd_data_disassociate(pmd_mapping, pmd_id);
> +            netdev_offload_dpdk_pmd_data_cleanup_mappings(offload,
> +                                                          pmd_mapping);
> +            ovsrcu_postpone(netdev_offload_dpdk_pmd_data_cleanup_mapping,
> +                            pmd_mapping);
> +            dpif_offload_dpdk_free_flow_mark(offload, flow_mark);
> +            return -1;
> +        }
> +    }

Good, the flow_mark cleanup is handled in the error path.

[ ... ]

> diff --git a/lib/netdev-offload-dpdk.h b/lib/netdev-offload-dpdk.h

[ ... ]

> -int netdev_offload_dpdk_init(struct netdev *);
> +int netdev_offload_dpdk_init(struct netdev *,
> +                             unsigned int offload_thread_count);
>  void netdev_offload_dpdk_uninit(struct netdev *);
> -int netdev_offload_dpdk_flow_flush(struct netdev *);
> -uint64_t netdev_offload_dpdk_flow_get_n_offloaded(struct netdev *);
> -int netdev_offload_dpdk_hw_miss_packet_recover(struct netdev *,
> -                                               struct dp_packet *);
> -#ifdef DPDK_NETDEV
> -int netdev_offload_dpdk_flow_put(struct netdev *, struct match *,
> +int netdev_offload_dpdk_flow_flush(struct dpif_offload_dpdk *,
> +                                   struct netdev *);
> +uint64_t netdev_offload_dpdk_flow_get_n_offloaded(
> +    struct netdev *, unsigned int offload_thread_count);
> +uint64_t netdev_offload_dpdk_flow_get_n_offloaded_by_thread(
> +    struct netdev *, unsigned int tid);
> +int netdev_offload_dpdk_hw_miss_packet_recover(struct dpif_offload_dpdk *,
> +                                               struct netdev *,
> +                                               unsigned pmd_id,
> +                                               struct dp_packet *,
> +                                               void **flow_reference);
> +int netdev_offload_dpdk_flow_put(struct dpif_offload_dpdk *,
> +                                 unsigned pmd_id, void *flow_reference,
> +                                 struct netdev *, struct match *,
>                                   struct nlattr *actions, size_t actions_len,
>                                   const ovs_u128 *ufid,
> -                                 struct dpif_netdev_offload_info *,
> +                                 odp_port_t orig_in_port,
> +                                 void **previous_flow_reference,
>                                   struct dpif_flow_stats *);
> -int netdev_offload_dpdk_flow_del(struct netdev *, const ovs_u128 *ufid,
> +int netdev_offload_dpdk_flow_del(struct dpif_offload_dpdk *, struct netdev *,
> +                                 unsigned pmd_id, const ovs_u128 *ufid,
> +                                 void *flow_reference,
>                                   struct dpif_flow_stats *);
>  int netdev_offload_dpdk_flow_get(struct netdev *, struct match *,
>                                   struct nlattr **actions, const ovs_u128 *ufid,
>                                   struct dpif_flow_stats *,
>                                   struct dpif_flow_attrs *, struct ofpbuf *buf);

The API changes look comprehensive. Are all the function signature changes 
backwards compatible with existing callers?

> -#else

The removal of the #ifdef DPDK_NETDEV block and associated stub functions 
seems correct for this abstraction.

Overall, this is a substantial refactoring that appears to properly separate 
concerns, though there are several potential concurrency and error handling 
issues to consider.
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index f7586bb0a..c0edef8e9 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -508,6 +508,7 @@  if DPDK_NETDEV
 lib_libopenvswitch_la_SOURCES += \
 	lib/dpdk.c \
 	lib/dpif-offload-dpdk.c \
+	lib/dpif-offload-dpdk-private.h \
 	lib/netdev-dpdk.c \
 	lib/netdev-offload-dpdk.c \
 	lib/netdev-offload-dpdk.h
diff --git a/lib/dpif-netdev-private-flow.h b/lib/dpif-netdev-private-flow.h
index 7425dd44e..753358925 100644
--- a/lib/dpif-netdev-private-flow.h
+++ b/lib/dpif-netdev-private-flow.h
@@ -89,7 +89,7 @@  struct dp_netdev_flow {
                                  /* 'flow_table'. */
     const struct cmap_node simple_match_node; /* In dp_netdev_pmd_thread's
                                                  'simple_match_table'. */
-    const struct cmap_node mark_node; /* In owning flow_mark's mark_to_flow */
+    const struct cmap_node mufid_node; /* In dp_netdev's 'mufid_to_flow'. */
     const ovs_u128 ufid;         /* Unique flow identifier. */
     const ovs_u128 mega_ufid;    /* Unique mega flow identifier. */
     const unsigned pmd_id;       /* The 'core_id' of pmd thread owning this */
@@ -102,18 +102,14 @@  struct dp_netdev_flow {
     struct ovs_refcount ref_cnt;
 
     bool dead;
-    uint32_t mark;               /* Unique flow mark for netdev offloading. */
+    bool offloaded;
+    atomic_int offload_queue_depth;
     uint64_t simple_match_mark;  /* Unique flow mark for the simple match. */
     odp_port_t orig_in_port;
 
     /* Statistics. */
     struct dp_netdev_flow_stats stats;
 
-    /* Statistics and attributes received from the netdev offload provider. */
-    atomic_int netdev_flow_get_result;
-    struct dp_netdev_flow_stats last_stats;
-    struct dp_netdev_flow_attrs last_attrs;
-
     /* Actions. */
     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
 
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 6324cfb7a..68d0a65fb 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -121,9 +121,7 @@  COVERAGE_DEFINE(datapath_drop_invalid_port);
 COVERAGE_DEFINE(datapath_drop_invalid_bond);
 COVERAGE_DEFINE(datapath_drop_invalid_tnl_port);
 COVERAGE_DEFINE(datapath_drop_rx_invalid_packet);
-#ifdef ALLOW_EXPERIMENTAL_API /* Packet restoration API required. */
 COVERAGE_DEFINE(datapath_drop_hw_miss_postprocess);
-#endif
 
 /* Protects against changes to 'dp_netdevs'. */
 struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
@@ -359,97 +357,6 @@  enum rxq_cycles_counter_type {
     RXQ_N_CYCLES
 };
 
-enum dp_offload_type {
-    DP_OFFLOAD_FLOW,
-    DP_OFFLOAD_FLUSH,
-};
-
-enum {
-    DP_NETDEV_FLOW_OFFLOAD_OP_ADD,
-    DP_NETDEV_FLOW_OFFLOAD_OP_MOD,
-    DP_NETDEV_FLOW_OFFLOAD_OP_DEL,
-};
-
-struct dp_offload_flow_item {
-    struct dp_netdev_flow *flow;
-    int op;
-    struct match match;
-    struct nlattr *actions;
-    size_t actions_len;
-    odp_port_t orig_in_port; /* Originating in_port for tnl flows. */
-};
-
-struct dp_offload_flush_item {
-    struct netdev *netdev;
-    struct ovs_barrier *barrier;
-};
-
-union dp_offload_thread_data {
-    struct dp_offload_flow_item flow;
-    struct dp_offload_flush_item flush;
-};
-
-struct dp_offload_thread_item {
-    struct mpsc_queue_node node;
-    enum dp_offload_type type;
-    long long int timestamp;
-    struct dp_netdev *dp;
-    union dp_offload_thread_data data[0];
-};
-
-struct dp_offload_thread {
-    PADDED_MEMBERS(CACHE_LINE_SIZE,
-        struct mpsc_queue queue;
-        atomic_uint64_t enqueued_item;
-        struct cmap megaflow_to_mark;
-        struct cmap mark_to_flow;
-        struct mov_avg_cma cma;
-        struct mov_avg_ema ema;
-    );
-};
-static struct dp_offload_thread *dp_offload_threads;
-static void *dp_netdev_flow_offload_main(void *arg);
-
-/* XXX: Temporarily forward declarations, will be removed during cleanup. */
-static unsigned int dpdk_offload_ufid_to_thread_id(const ovs_u128 ufid);
-static unsigned int dpdk_offload_thread_init(void);
-void dpdk_offload_thread_set_thread_nb(unsigned int thread_nb);
-unsigned int dpdk_offload_thread_nb(void);
-unsigned int dpdk_offload_thread_id(void);
-
-/* XXX: Temporarily external declarations, will be removed during cleanup. */
-struct netdev *dpif_netdev_offload_get_netdev_by_port_id(odp_port_t);
-
-static void
-dp_netdev_offload_init(void)
-{
-    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
-    unsigned int nb_offload_thread = dpdk_offload_thread_nb();
-    unsigned int tid;
-
-    if (!ovsthread_once_start(&once)) {
-        return;
-    }
-
-    dp_offload_threads = xcalloc(nb_offload_thread,
-                                 sizeof *dp_offload_threads);
-
-    for (tid = 0; tid < nb_offload_thread; tid++) {
-        struct dp_offload_thread *thread;
-
-        thread = &dp_offload_threads[tid];
-        mpsc_queue_init(&thread->queue);
-        cmap_init(&thread->megaflow_to_mark);
-        cmap_init(&thread->mark_to_flow);
-        atomic_init(&thread->enqueued_item, 0);
-        mov_avg_cma_init(&thread->cma);
-        mov_avg_ema_init(&thread->ema, 100);
-        ovs_thread_create("dpdk_offload", dp_netdev_flow_offload_main, thread);
-    }
-
-    ovsthread_once_done(&once);
-}
-
 #define XPS_TIMEOUT 500000LL    /* In microseconds. */
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
@@ -622,9 +529,6 @@  static void dp_netdev_del_bond_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                            uint32_t bond_id)
     OVS_EXCLUDED(pmd->bond_mutex);
 
-static void dp_netdev_offload_flush(struct dp_netdev *dp,
-                                    struct dp_netdev_port *port);
-
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQ_RDLOCK(dp->port_rwlock);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
@@ -662,8 +566,6 @@  dp_netdev_pmd_lookup_dpcls(struct dp_netdev_pmd_thread *pmd,
 static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
 static inline bool
 pmd_perf_metrics_enabled(const struct dp_netdev_pmd_thread *pmd);
-static void queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
-                                  struct dp_netdev_flow *flow);
 
 static void dp_netdev_simple_match_insert(struct dp_netdev_pmd_thread *pmd,
                                           struct dp_netdev_flow *flow)
@@ -2398,17 +2300,6 @@  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     seq_change(dp->port_seq);
 
     reconfigure_datapath(dp);
-
-    /* Flush and disable offloads only after 'port' has been made
-     * inaccessible through datapath reconfiguration.
-     * This prevents having PMDs enqueuing offload requests after
-     * the flush.
-     * When only this port is deleted instead of the whole datapath,
-     * revalidator threads are still active and can still enqueue
-     * offload modification or deletion. Managing those stray requests
-     * is done in the offload threads. */
-    dp_netdev_offload_flush(dp, port);
-
     port_destroy(port);
 }
 
@@ -2507,511 +2398,6 @@  dp_netdev_pmd_find_dpcls(struct dp_netdev_pmd_thread *pmd,
     return cls;
 }
 
-#define MAX_FLOW_MARK       (UINT32_MAX - 1)
-#define INVALID_FLOW_MARK   0
-/* Zero flow mark is used to indicate the HW to remove the mark. A packet
- * marked with zero mark is received in SW without a mark at all, so it
- * cannot be used as a valid mark.
- */
-
-struct megaflow_to_mark_data {
-    const struct cmap_node node;
-    ovs_u128 mega_ufid;
-    uint32_t mark;
-};
-
-static struct id_fpool *flow_mark_pool;
-
-static uint32_t
-flow_mark_alloc(void)
-{
-    static struct ovsthread_once init_once = OVSTHREAD_ONCE_INITIALIZER;
-    unsigned int tid = dpdk_offload_thread_id();
-    uint32_t mark;
-
-    if (ovsthread_once_start(&init_once)) {
-        /* Haven't initiated yet, do it here */
-        flow_mark_pool = id_fpool_create(dpdk_offload_thread_nb(),
-                                         1, MAX_FLOW_MARK);
-        ovsthread_once_done(&init_once);
-    }
-
-    if (id_fpool_new_id(flow_mark_pool, tid, &mark)) {
-        return mark;
-    }
-
-    return INVALID_FLOW_MARK;
-}
-
-static void
-flow_mark_free(uint32_t mark)
-{
-    unsigned int tid = dpdk_offload_thread_id();
-
-    id_fpool_free_id(flow_mark_pool, tid, mark);
-}
-
-/* associate megaflow with a mark, which is a 1:1 mapping */
-static void
-megaflow_to_mark_associate(const ovs_u128 *mega_ufid, uint32_t mark)
-{
-    size_t hash = dp_netdev_flow_hash(mega_ufid);
-    struct megaflow_to_mark_data *data = xzalloc(sizeof(*data));
-    unsigned int tid = dpdk_offload_thread_id();
-
-    data->mega_ufid = *mega_ufid;
-    data->mark = mark;
-
-    cmap_insert(&dp_offload_threads[tid].megaflow_to_mark,
-                CONST_CAST(struct cmap_node *, &data->node), hash);
-}
-
-/* disassociate meagaflow with a mark */
-static void
-megaflow_to_mark_disassociate(const ovs_u128 *mega_ufid)
-{
-    size_t hash = dp_netdev_flow_hash(mega_ufid);
-    struct megaflow_to_mark_data *data;
-    unsigned int tid = dpdk_offload_thread_id();
-
-    CMAP_FOR_EACH_WITH_HASH (data, node, hash,
-                             &dp_offload_threads[tid].megaflow_to_mark) {
-        if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
-            cmap_remove(&dp_offload_threads[tid].megaflow_to_mark,
-                        CONST_CAST(struct cmap_node *, &data->node), hash);
-            ovsrcu_postpone(free, data);
-            return;
-        }
-    }
-
-    VLOG_WARN("Masked ufid "UUID_FMT" is not associated with a mark?\n",
-              UUID_ARGS((struct uuid *)mega_ufid));
-}
-
-static inline uint32_t
-megaflow_to_mark_find(const ovs_u128 *mega_ufid)
-{
-    size_t hash = dp_netdev_flow_hash(mega_ufid);
-    struct megaflow_to_mark_data *data;
-    unsigned int tid = dpdk_offload_thread_id();
-
-    CMAP_FOR_EACH_WITH_HASH (data, node, hash,
-                             &dp_offload_threads[tid].megaflow_to_mark) {
-        if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
-            return data->mark;
-        }
-    }
-
-    VLOG_DBG("Mark id for ufid "UUID_FMT" was not found\n",
-             UUID_ARGS((struct uuid *)mega_ufid));
-    return INVALID_FLOW_MARK;
-}
-
-/* associate mark with a flow, which is 1:N mapping */
-static void
-mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
-{
-    unsigned int tid = dpdk_offload_thread_id();
-    dp_netdev_flow_ref(flow);
-
-    cmap_insert(&dp_offload_threads[tid].mark_to_flow,
-                CONST_CAST(struct cmap_node *, &flow->mark_node),
-                hash_int(mark, 0));
-    flow->mark = mark;
-
-    VLOG_DBG("Associated dp_netdev flow %p with mark %u mega_ufid "UUID_FMT,
-             flow, mark, UUID_ARGS((struct uuid *) &flow->mega_ufid));
-}
-
-static bool
-flow_mark_has_no_ref(uint32_t mark)
-{
-    unsigned int tid = dpdk_offload_thread_id();
-    struct dp_netdev_flow *flow;
-
-    CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash_int(mark, 0),
-                             &dp_offload_threads[tid].mark_to_flow) {
-        if (flow->mark == mark) {
-            return false;
-        }
-    }
-
-    return true;
-}
-
-static int
-mark_to_flow_disassociate(struct dp_netdev *dp,
-                          struct dp_netdev_flow *flow)
-{
-    struct cmap_node *mark_node = CONST_CAST(struct cmap_node *,
-                                             &flow->mark_node);
-    unsigned int tid = dpdk_offload_thread_id();
-    uint32_t mark = flow->mark;
-    int ret = 0;
-
-    /* INVALID_FLOW_MARK may mean that the flow has been disassociated or
-     * never associated. */
-    if (OVS_UNLIKELY(mark == INVALID_FLOW_MARK)) {
-        return EINVAL;
-    }
-
-    cmap_remove(&dp_offload_threads[tid].mark_to_flow,
-                mark_node, hash_int(mark, 0));
-    flow->mark = INVALID_FLOW_MARK;
-
-    /*
-     * no flow is referencing the mark any more? If so, let's
-     * remove the flow from hardware and free the mark.
-     */
-    if (flow_mark_has_no_ref(mark)) {
-        struct netdev *port;
-        odp_port_t in_port = flow->flow.in_port.odp_port;
-
-        port = dpif_netdev_offload_get_netdev_by_port_id(in_port);
-        if (port) {
-            /* Taking a global 'port_rwlock' to fulfill thread safety
-             * restrictions regarding netdev port mapping. */
-            ovs_rwlock_rdlock(&dp->port_rwlock);
-            ret = netdev_offload_dpdk_flow_del(port, &flow->mega_ufid, NULL);
-            ovs_rwlock_unlock(&dp->port_rwlock);
-        }
-
-        flow_mark_free(mark);
-        VLOG_DBG("Freed flow mark %u mega_ufid "UUID_FMT, mark,
-                 UUID_ARGS((struct uuid *) &flow->mega_ufid));
-
-        megaflow_to_mark_disassociate(&flow->mega_ufid);
-    }
-    dp_netdev_flow_unref(flow);
-
-    return ret;
-}
-
-static struct dp_netdev_flow *
-mark_to_flow_find(const struct dp_netdev_pmd_thread *pmd,
-                  const uint32_t mark)
-{
-    struct dp_netdev_flow *flow;
-    unsigned int tid;
-    size_t hash;
-
-    if (dp_offload_threads == NULL) {
-        return NULL;
-    }
-
-    hash = hash_int(mark, 0);
-    for (tid = 0; tid < dpdk_offload_thread_nb(); tid++) {
-        CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash,
-                                 &dp_offload_threads[tid].mark_to_flow) {
-            if (flow->mark == mark && flow->pmd_id == pmd->core_id &&
-                flow->dead == false) {
-                return flow;
-            }
-        }
-    }
-
-    return NULL;
-}
-
-static struct dp_offload_thread_item *
-dp_netdev_alloc_flow_offload(struct dp_netdev *dp,
-                             struct dp_netdev_flow *flow,
-                             int op)
-{
-    struct dp_offload_thread_item *item;
-    struct dp_offload_flow_item *flow_offload;
-
-    item = xzalloc(sizeof *item + sizeof *flow_offload);
-    flow_offload = &item->data->flow;
-
-    item->type = DP_OFFLOAD_FLOW;
-    item->dp = dp;
-
-    flow_offload->flow = flow;
-    flow_offload->op = op;
-
-    dp_netdev_flow_ref(flow);
-
-    return item;
-}
-
-static void
-dp_netdev_free_flow_offload__(struct dp_offload_thread_item *offload)
-{
-    struct dp_offload_flow_item *flow_offload = &offload->data->flow;
-
-    free(flow_offload->actions);
-    free(offload);
-}
-
-static void
-dp_netdev_free_flow_offload(struct dp_offload_thread_item *offload)
-{
-    struct dp_offload_flow_item *flow_offload = &offload->data->flow;
-
-    dp_netdev_flow_unref(flow_offload->flow);
-    ovsrcu_postpone(dp_netdev_free_flow_offload__, offload);
-}
-
-static void
-dp_netdev_free_offload(struct dp_offload_thread_item *offload)
-{
-    switch (offload->type) {
-    case DP_OFFLOAD_FLOW:
-        dp_netdev_free_flow_offload(offload);
-        break;
-    case DP_OFFLOAD_FLUSH:
-        free(offload);
-        break;
-    default:
-        OVS_NOT_REACHED();
-    };
-}
-
-static void
-dp_netdev_append_offload(struct dp_offload_thread_item *offload,
-                         unsigned int tid)
-{
-    dp_netdev_offload_init();
-
-    mpsc_queue_insert(&dp_offload_threads[tid].queue, &offload->node);
-    atomic_count_inc64(&dp_offload_threads[tid].enqueued_item);
-}
-
-static void
-dp_netdev_offload_flow_enqueue(struct dp_offload_thread_item *item)
-{
-    struct dp_offload_flow_item *flow_offload = &item->data->flow;
-    unsigned int tid;
-
-    ovs_assert(item->type == DP_OFFLOAD_FLOW);
-
-    tid = dpdk_offload_ufid_to_thread_id(flow_offload->flow->mega_ufid);
-    dp_netdev_append_offload(item, tid);
-}
-
-static int
-dp_netdev_flow_offload_del(struct dp_offload_thread_item *item)
-{
-    return mark_to_flow_disassociate(item->dp, item->data->flow.flow);
-}
-
-/*
- * There are two flow offload operations here: addition and modification.
- *
- * For flow addition, this function does:
- * - allocate a new flow mark id
- * - perform hardware flow offload
- * - associate the flow mark with flow and mega flow
- *
- * For flow modification, both flow mark and the associations are still
- * valid, thus only item 2 needed.
- */
-static int
-dp_netdev_flow_offload_put(struct dp_offload_thread_item *item)
-{
-    struct dp_offload_flow_item *offload = &item->data->flow;
-    struct dp_netdev *dp = item->dp;
-    struct dp_netdev_flow *flow = offload->flow;
-    odp_port_t in_port = flow->flow.in_port.odp_port;
-    bool modification = offload->op == DP_NETDEV_FLOW_OFFLOAD_OP_MOD
-                        && flow->mark != INVALID_FLOW_MARK;
-    struct dpif_netdev_offload_info info;
-    struct netdev *port;
-    uint32_t mark;
-    int ret;
-
-    if (flow->dead) {
-        return -1;
-    }
-
-    if (modification) {
-        mark = flow->mark;
-    } else {
-        /*
-         * If a mega flow has already been offloaded (from other PMD
-         * instances), do not offload it again.
-         */
-        mark = megaflow_to_mark_find(&flow->mega_ufid);
-        if (mark != INVALID_FLOW_MARK) {
-            VLOG_DBG("Flow has already been offloaded with mark %u\n", mark);
-            if (flow->mark != INVALID_FLOW_MARK) {
-                ovs_assert(flow->mark == mark);
-            } else {
-                mark_to_flow_associate(mark, flow);
-            }
-            return 0;
-        }
-
-        mark = flow_mark_alloc();
-        if (mark == INVALID_FLOW_MARK) {
-            VLOG_ERR("Failed to allocate flow mark!\n");
-            return -1;
-        }
-    }
-    info.flow_mark = mark;
-    info.orig_in_port = offload->orig_in_port;
-
-    port = dpif_netdev_offload_get_netdev_by_port_id(in_port);
-    if (!port) {
-        goto err_free;
-    }
-
-    /* Taking a global 'port_rwlock' to fulfill thread safety
-     * restrictions regarding the netdev port mapping. */
-    ovs_rwlock_rdlock(&dp->port_rwlock);
-    ret = netdev_offload_dpdk_flow_put(
-        port, &offload->match, CONST_CAST(struct nlattr *, offload->actions),
-        offload->actions_len, &flow->mega_ufid, &info, NULL);
-    ovs_rwlock_unlock(&dp->port_rwlock);
-
-    if (ret) {
-        goto err_free;
-    }
-
-    if (!modification) {
-        megaflow_to_mark_associate(&flow->mega_ufid, mark);
-        mark_to_flow_associate(mark, flow);
-    }
-    return 0;
-
-err_free:
-    if (!modification) {
-        flow_mark_free(mark);
-    } else {
-        mark_to_flow_disassociate(item->dp, flow);
-    }
-    return -1;
-}
-
-static void
-dp_offload_flow(struct dp_offload_thread_item *item)
-{
-    struct dp_offload_flow_item *flow_offload = &item->data->flow;
-    const char *op;
-    int ret;
-
-    switch (flow_offload->op) {
-    case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
-        op = "add";
-        ret = dp_netdev_flow_offload_put(item);
-        break;
-    case DP_NETDEV_FLOW_OFFLOAD_OP_MOD:
-        op = "modify";
-        ret = dp_netdev_flow_offload_put(item);
-        break;
-    case DP_NETDEV_FLOW_OFFLOAD_OP_DEL:
-        op = "delete";
-        ret = dp_netdev_flow_offload_del(item);
-        break;
-    default:
-        OVS_NOT_REACHED();
-    }
-
-    VLOG_DBG("%s to %s netdev flow "UUID_FMT,
-             ret == 0 ? "succeed" : "failed", op,
-             UUID_ARGS((struct uuid *) &flow_offload->flow->mega_ufid));
-}
-
-static void
-dp_offload_flush(struct dp_offload_thread_item *item)
-{
-    struct dp_offload_flush_item *flush = &item->data->flush;
-
-    ovs_rwlock_rdlock(&item->dp->port_rwlock);
-    dpif_offload_netdev_flush_flows(flush->netdev);
-    ovs_rwlock_unlock(&item->dp->port_rwlock);
-
-    ovs_barrier_block(flush->barrier);
-
-    /* Allow the initiator thread to take again the port lock,
-     * before continuing offload operations in this thread.
-     */
-    ovs_barrier_block(flush->barrier);
-}
-
-#define DP_NETDEV_OFFLOAD_BACKOFF_MIN 1
-#define DP_NETDEV_OFFLOAD_BACKOFF_MAX 64
-#define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
-
-static void *
-dp_netdev_flow_offload_main(void *arg)
-{
-    struct dp_offload_thread *ofl_thread = arg;
-    struct dp_offload_thread_item *offload;
-    struct mpsc_queue_node *node;
-    struct mpsc_queue *queue;
-    long long int latency_us;
-    long long int next_rcu;
-    long long int now;
-    uint64_t backoff;
-
-    queue = &ofl_thread->queue;
-    mpsc_queue_acquire(queue);
-
-    while (true) {
-        backoff = DP_NETDEV_OFFLOAD_BACKOFF_MIN;
-        while (mpsc_queue_tail(queue) == NULL) {
-            xnanosleep(backoff * 1E6);
-            if (backoff < DP_NETDEV_OFFLOAD_BACKOFF_MAX) {
-                backoff <<= 1;
-            }
-        }
-
-        next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-        MPSC_QUEUE_FOR_EACH_POP (node, queue) {
-            offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
-            atomic_count_dec64(&ofl_thread->enqueued_item);
-
-            switch (offload->type) {
-            case DP_OFFLOAD_FLOW:
-                dp_offload_flow(offload);
-                break;
-            case DP_OFFLOAD_FLUSH:
-                dp_offload_flush(offload);
-                break;
-            default:
-                OVS_NOT_REACHED();
-            }
-
-            now = time_usec();
-
-            latency_us = now - offload->timestamp;
-            mov_avg_cma_update(&ofl_thread->cma, latency_us);
-            mov_avg_ema_update(&ofl_thread->ema, latency_us);
-
-            dp_netdev_free_offload(offload);
-
-            /* Do RCU synchronization at fixed interval. */
-            if (now > next_rcu) {
-                ovsrcu_quiesce();
-                next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-            }
-        }
-    }
-
-    OVS_NOT_REACHED();
-    mpsc_queue_release(queue);
-
-    return NULL;
-}
-
-static void
-queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
-                      struct dp_netdev_flow *flow)
-{
-    struct dp_offload_thread_item *offload;
-
-    if (!dpif_offload_is_offload_enabled()) {
-        return;
-    }
-
-    offload = dp_netdev_alloc_flow_offload(pmd->dp, flow,
-                                           DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
-    offload->timestamp = pmd->ctx.now;
-    dp_netdev_offload_flow_enqueue(offload);
-}
-
 static void
 log_netdev_flow_change(const struct dp_netdev_flow *flow,
                        const struct match *match,
@@ -3077,137 +2463,171 @@  log_netdev_flow_change(const struct dp_netdev_flow *flow,
     ds_destroy(&ds);
 }
 
-static void
-queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
-                      struct dp_netdev_flow *flow, struct match *match,
-                      const struct nlattr *actions, size_t actions_len,
-                      int op)
+/* Offloaded flows can be handled asynchronously, so we do not always know
+ * whether a specific flow is offloaded or not.  It might still be pending;
+ * in fact, multiple modifications can be pending, and the actual offload
+ * state depends on the completion of each modification.
+ *
+ * To correctly determine whether a flow is offloaded when it is being
+ * destroyed (and therefore requires cleanup), we must ensure that all
+ * operations have completed.  To achieve this, we track the number of
+ * outstanding offloaded flow modifications. */
+static bool
+offload_queue_inc(struct dp_netdev_flow *flow)
 {
-    struct dp_offload_thread_item *item;
-    struct dp_offload_flow_item *flow_offload;
+    int current;
 
-    if (!dpif_offload_is_offload_enabled()) {
-        return;
+    while (true) {
+        atomic_read(&flow->offload_queue_depth, &current);
+        if (current < 0) {
+            /* We are cleaning up, so no longer enqueue operations. */
+            return false;
+        }
+
+        /* Here we try to atomically increase the value.  If we do not succeed,
+         * someone else has modified it, and we need to check again for a
+         * current negative value. */
+        if (atomic_compare_exchange_strong(&flow->offload_queue_depth,
+                                           &current, current + 1)) {
+            return true;
+        }
     }
+}
 
-    item = dp_netdev_alloc_flow_offload(pmd->dp, flow, op);
-    flow_offload = &item->data->flow;
-    flow_offload->match = *match;
-    flow_offload->actions = xmalloc(actions_len);
-    nullable_memcpy(flow_offload->actions, actions, actions_len);
-    flow_offload->actions_len = actions_len;
-    flow_offload->orig_in_port = flow->orig_in_port;
+static bool
+offload_queue_dec(struct dp_netdev_flow *flow)
+{
+    int old;
 
-    item->timestamp = pmd->ctx.now;
-    dp_netdev_offload_flow_enqueue(item);
+    atomic_sub(&flow->offload_queue_depth, 1, &old);
+    ovs_assert(old >= 1);
+
+    if (old == 1) {
+        /* Note that this only indicates that the queue might be empty. */
+        return true;
+    }
+    return false;
 }
 
-static void
-dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
-                          struct dp_netdev_flow *flow)
-    OVS_REQUIRES(pmd->flow_mutex)
+static bool
+offload_queue_complete(struct dp_netdev_flow *flow)
 {
-    struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
-    struct dpcls *cls;
-    odp_port_t in_port = flow->flow.in_port.odp_port;
-
-    cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
-    ovs_assert(cls != NULL);
-    dpcls_remove(cls, &flow->cr);
-    dp_netdev_simple_match_remove(pmd, flow);
-    cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
-    ccmap_dec(&pmd->n_flows, odp_to_u32(in_port));
-    queue_netdev_flow_del(pmd, flow);
-    flow->dead = true;
+    /* This function returns false if the queue is still in use.
+     * If the queue is empty, it will attempt to atomically mark it as
+     * 'not in use' by making the queue depth negative.  This prevents
+     * other flow operations from being added.  If successful, it returns
+     * true. */
+     int expected_val = 0;
 
-    dp_netdev_flow_unref(flow);
+    return atomic_compare_exchange_strong(&flow->offload_queue_depth,
+                                          &expected_val, -1);
 }
 
 static void
-dp_netdev_offload_flush_enqueue(struct dp_netdev *dp,
-                                struct netdev *netdev,
-                                struct ovs_barrier *barrier)
+offload_flow_reference_unreference_cb(unsigned pmd_id OVS_UNUSED,
+                                      void *flow_reference_)
 {
-    unsigned int tid;
-    long long int now_us = time_usec();
+    struct dp_netdev_flow *flow_reference = flow_reference_;
 
-    for (tid = 0; tid < dpdk_offload_thread_nb(); tid++) {
-        struct dp_offload_thread_item *item;
-        struct dp_offload_flush_item *flush;
+    if (flow_reference) {
+        flow_reference->offloaded = false;
+        dp_netdev_flow_unref(flow_reference);
+    }
+}
 
-        item = xmalloc(sizeof *item + sizeof *flush);
-        item->type = DP_OFFLOAD_FLUSH;
-        item->dp = dp;
-        item->timestamp = now_us;
+static void
+offload_flow_del_resume(struct dp_netdev_flow *flow_reference,
+                        int error)
+{
+    if (error == EINPROGRESS) {
+        return;
+    }
 
-        flush = &item->data->flush;
-        flush->netdev = netdev;
-        flush->barrier = barrier;
+    if (error) {
+        odp_port_t in_port = flow_reference->flow.in_port.odp_port;
 
-        dp_netdev_append_offload(item, tid);
+        VLOG_DBG(
+            "Failed removing offload flow ufid " UUID_FMT " from port %d: %d",
+            UUID_ARGS((struct uuid *)&flow_reference->mega_ufid), in_port,
+            error);
+    } else {
+        /* Release because we successfully removed the reference. */
+        dp_netdev_flow_unref(flow_reference);
     }
+
+    /* Release as we took a reference in offload_flow_del(). */
+    dp_netdev_flow_unref(flow_reference);
 }
 
-/* Blocking call that will wait on the offload thread to
- * complete its work.  As the flush order will only be
- * enqueued after existing offload requests, those previous
- * offload requests must be processed, which requires being
- * able to lock the 'port_rwlock' from the offload thread.
- *
- * Flow offload flush is done when a port is being deleted.
- * Right after this call executes, the offload API is disabled
- * for the port. This call must be made blocking until the
- * offload provider completed its job.
- */
 static void
-dp_netdev_offload_flush(struct dp_netdev *dp,
-                        struct dp_netdev_port *port)
-    OVS_REQ_WRLOCK(dp->port_rwlock)
+offload_flow_del_resume_cb(void *aux OVS_UNUSED,
+                           struct dpif_flow_stats *stats OVS_UNUSED,
+                           unsigned pmd_id OVS_UNUSED,
+                           void *flow_reference,
+                           void *previous_flow_reference OVS_UNUSED, int error)
 {
-    /* The flush mutex serves to exclude mutual access to the static
-     * barrier, and to prevent multiple flush orders to several threads.
-     *
-     * The memory barrier needs to go beyond the function scope as
-     * the other threads can resume from blocking after this function
-     * already finished.
-     *
-     * Additionally, because the flush operation is blocking, it would
-     * deadlock if multiple offload threads were blocking on several
-     * different barriers. Only allow a single flush order in the offload
-     * queue at a time.
-     */
-    static struct ovs_mutex flush_mutex = OVS_MUTEX_INITIALIZER;
-    static struct ovs_barrier barrier OVS_GUARDED_BY(flush_mutex);
-    struct netdev *netdev;
+    offload_flow_del_resume(flow_reference, error);
+}
+
+static void
+offload_flow_del(struct dp_netdev *dp, unsigned pmd_id,
+                 struct dp_netdev_flow *flow)
+{
+    odp_port_t in_port = flow->flow.in_port.odp_port;
+    struct dpif_offload_flow_del del = {
+        .in_port = in_port,
+        .pmd_id = pmd_id,
+        .ufid = CONST_CAST(ovs_u128 *, &flow->mega_ufid),
+        .flow_reference = flow,
+        .stats = NULL,
+        .cb_data = { .callback = offload_flow_del_resume_cb },
+    };
+    int error;
 
     if (!dpif_offload_is_offload_enabled()) {
         return;
     }
 
-    ovs_rwlock_unlock(&dp->port_rwlock);
-    ovs_mutex_lock(&flush_mutex);
+    /* This offload flow delete is only called when the actual flow is
+     * destructed.  However, we can only trust the state of flow->offloaded
+     * if no more flow_put operations are pending.  Below, we check whether
+     * the queue can be marked as complete, and then determine if we need
+     * to schedule a removal.  If not, the delete will be rescheduled later
+     * in the last offload_flow_put_resume_cb() callback. */
+    ovs_assert(flow->dead);
+    if (!offload_queue_complete(flow) || !flow->offloaded) {
+        return;
+    }
 
-    /* This thread and the offload threads. */
-    ovs_barrier_init(&barrier, 1 + dpdk_offload_thread_nb());
+    flow->offloaded = false;
+    dp_netdev_flow_ref(flow);
 
-    netdev = netdev_ref(port->netdev);
-    dp_netdev_offload_flush_enqueue(dp, netdev, &barrier);
-    ovs_barrier_block(&barrier);
-    netdev_close(netdev);
+    /* It's the responsibility of the offload provider to remove the
+     * actual rule from hardware only if none of the other PMD threads
+     * have the rule installed in hardware. */
+    error = dpif_offload_datapath_flow_del(dp->full_name, &del);
+    offload_flow_del_resume(flow, error);
+}
 
-    /* Take back the datapath port lock before allowing the offload
-     * threads to proceed further. The port deletion must complete first,
-     * to ensure no further offloads are inserted after the flush.
-     *
-     * Some offload provider (e.g. DPDK) keeps a netdev reference with
-     * the offload data. If this reference is not closed, the netdev is
-     * kept indefinitely. */
-    ovs_rwlock_wrlock(&dp->port_rwlock);
+static void
+dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
+                          struct dp_netdev_flow *flow)
+    OVS_REQUIRES(pmd->flow_mutex)
+{
+    struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
+    struct dpcls *cls;
+    odp_port_t in_port = flow->flow.in_port.odp_port;
 
-    ovs_barrier_block(&barrier);
-    ovs_barrier_destroy(&barrier);
+    cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+    ovs_assert(cls != NULL);
+    dpcls_remove(cls, &flow->cr);
+    dp_netdev_simple_match_remove(pmd, flow);
+    cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
+    ccmap_dec(&pmd->n_flows, odp_to_u32(in_port));
+    flow->dead = true;
+    offload_flow_del(pmd->dp, pmd->core_id, flow);
 
-    ovs_mutex_unlock(&flush_mutex);
+    dp_netdev_flow_unref(flow);
 }
 
 static void
@@ -3649,112 +3069,7 @@  dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
-dp_netdev_flow_set_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
-                                    const struct dpif_flow_stats *stats,
-                                    const struct dpif_flow_attrs *attrs,
-                                    int result)
-{
-    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
-    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
-
-    atomic_store_relaxed(&netdev_flow->netdev_flow_get_result, result);
-    if (result) {
-        return;
-    }
-
-    atomic_store_relaxed(&last_stats->used,         stats->used);
-    atomic_store_relaxed(&last_stats->packet_count, stats->n_packets);
-    atomic_store_relaxed(&last_stats->byte_count,   stats->n_bytes);
-    atomic_store_relaxed(&last_stats->tcp_flags,    stats->tcp_flags);
-
-    atomic_store_relaxed(&last_attrs->offloaded,    attrs->offloaded);
-    atomic_store_relaxed(&last_attrs->dp_layer,     attrs->dp_layer);
-
-}
-
-static void
-dp_netdev_flow_get_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
-                                    struct dpif_flow_stats *stats,
-                                    struct dpif_flow_attrs *attrs,
-                                    int *result)
-{
-    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
-    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
-
-    atomic_read_relaxed(&netdev_flow->netdev_flow_get_result, result);
-    if (*result) {
-        return;
-    }
-
-    atomic_read_relaxed(&last_stats->used,         &stats->used);
-    atomic_read_relaxed(&last_stats->packet_count, &stats->n_packets);
-    atomic_read_relaxed(&last_stats->byte_count,   &stats->n_bytes);
-    atomic_read_relaxed(&last_stats->tcp_flags,    &stats->tcp_flags);
-
-    atomic_read_relaxed(&last_attrs->offloaded,    &attrs->offloaded);
-    atomic_read_relaxed(&last_attrs->dp_layer,     &attrs->dp_layer);
-}
-
-static bool
-dpif_netdev_get_flow_offload_status(const struct dp_netdev *dp,
-                                    struct dp_netdev_flow *netdev_flow,
-                                    struct dpif_flow_stats *stats,
-                                    struct dpif_flow_attrs *attrs)
-{
-    uint64_t act_buf[1024 / 8];
-    struct nlattr *actions;
-    struct netdev *netdev;
-    struct match match;
-    struct ofpbuf buf;
-
-    int ret = 0;
-
-    if (!dpif_offload_is_offload_enabled()) {
-        return false;
-    }
-
-    netdev = dpif_netdev_offload_get_netdev_by_port_id(
-        netdev_flow->flow.in_port.odp_port);
-    if (!netdev) {
-        return false;
-    }
-    ofpbuf_use_stack(&buf, &act_buf, sizeof act_buf);
-    /* Taking a global 'port_rwlock' to fulfill thread safety
-     * restrictions regarding netdev port mapping.
-     *
-     * XXX: Main thread will try to pause/stop all revalidators during datapath
-     *      reconfiguration via datapath purge callback (dp_purge_cb) while
-     *      rw-holding 'dp->port_rwlock'.  So we're not waiting for lock here.
-     *      Otherwise, deadlock is possible, because revalidators might sleep
-     *      waiting for the main thread to release the lock and main thread
-     *      will wait for them to stop processing.
-     *      This workaround might make statistics less accurate. Especially
-     *      for flow deletion case, since there will be no other attempt.  */
-    if (!ovs_rwlock_tryrdlock(&dp->port_rwlock)) {
-        ret = netdev_offload_dpdk_flow_get(netdev, &match, &actions,
-                                           &netdev_flow->mega_ufid, stats,
-                                           attrs, &buf);
-        /* Storing statistics and attributes from the last request for
-         * later use on mutex contention. */
-        dp_netdev_flow_set_last_stats_attrs(netdev_flow, stats, attrs, ret);
-        ovs_rwlock_unlock(&dp->port_rwlock);
-    } else {
-        dp_netdev_flow_get_last_stats_attrs(netdev_flow, stats, attrs, &ret);
-        if (!ret && !attrs->dp_layer) {
-            /* Flow was never reported as 'offloaded' so it's harmless
-             * to continue to think so. */
-            ret = EAGAIN;
-        }
-    }
-    if (ret) {
-        return false;
-    }
-
-    return true;
-}
-
-static void
-get_dpif_flow_status(const struct dp_netdev *dp,
+get_dpif_flow_status(const struct dp_netdev *dp OVS_UNUSED,
                      const struct dp_netdev_flow *netdev_flow_,
                      struct dpif_flow_stats *stats,
                      struct dpif_flow_attrs *attrs)
@@ -3777,8 +3092,10 @@  get_dpif_flow_status(const struct dp_netdev *dp,
     atomic_read_relaxed(&netdev_flow->stats.tcp_flags, &flags);
     stats->tcp_flags = flags;
 
-    if (dpif_netdev_get_flow_offload_status(dp, netdev_flow,
-                                            &offload_stats, &offload_attrs)) {
+    if (dpif_offload_datapath_flow_stats(dp->full_name,
+                                         netdev_flow->flow.in_port.odp_port,
+                                         &netdev_flow->mega_ufid,
+                                         &offload_stats, &offload_attrs)) {
         stats->n_packets += offload_stats.n_packets;
         stats->n_bytes += offload_stats.n_bytes;
         stats->used = MAX(stats->used, offload_stats.used);
@@ -4149,6 +3466,100 @@  dp_netdev_flow_is_simple_match(const struct match *match)
     return true;
 }
 
+static void
+offload_flow_put_resume(struct dp_netdev *dp, struct dp_netdev_flow *flow,
+                        struct dp_netdev_flow *previous_flow_reference,
+                        unsigned pmd_id, int error)
+{
+    if (error == EINPROGRESS) {
+        return;
+    }
+
+    if (!error) {
+        flow->offloaded = true;
+    } else {
+        /* If the flow was already offloaded, the new action set can no
+         * longer be offloaded.  In theory, we should disassociate the
+         * offload from all PMDs that have this flow marked as offloaded.
+         * Unfortunately, there is no mechanism to inform other PMDs, so
+         * we cannot explicitly mark such flows.  This situation typically
+         * occurs when the revalidator modifies the flow, so it is safe to
+         * assume it will update all affected flows and that the offload
+         * will subsequently fail. */
+        flow->offloaded = false;
+
+        /* On error, the flow reference was not stored by the offload provider,
+         * so we should decrease the reference. */
+        dp_netdev_flow_unref(flow);
+    }
+
+    if (offload_queue_dec(flow) && flow->dead) {
+        /* If flows are processed asynchronously, modifications might
+         * still be queued up while the flow is being removed.  If this
+         * was the last flow in the queue on a dead flow, we try again
+         * to see if we need to remove this flow. */
+        offload_flow_del(dp, pmd_id, flow);
+    }
+
+    if (previous_flow_reference) {
+        dp_netdev_flow_unref(previous_flow_reference);
+        if (previous_flow_reference != flow) {
+            VLOG_DBG("Updated flow reference was from outdated flow");
+        }
+    }
+}
+
+static void
+offload_flow_put_resume_cb(void *aux, struct dpif_flow_stats *stats OVS_UNUSED,
+                           unsigned pmd_id, void *flow_reference_,
+                           void *old_flow_reference_,
+                           int error)
+{
+    struct dp_netdev *dp = aux;
+    struct dp_netdev_flow *flow_reference = flow_reference_;
+    struct dp_netdev_flow *old_flow_reference = old_flow_reference_;
+
+    offload_flow_put_resume(dp, flow_reference, old_flow_reference,
+                            pmd_id, error);
+}
+
+static void
+offload_flow_put(struct dp_netdev_pmd_thread *pmd, struct dp_netdev_flow *flow,
+                 struct match *match, const struct nlattr *actions,
+                 size_t actions_len)
+{
+    struct dpif_offload_flow_put put = {
+        .in_port = match->flow.in_port.odp_port,
+        .orig_in_port = flow->orig_in_port,
+        .pmd_id = pmd->core_id,
+        .ufid = CONST_CAST(ovs_u128 *, &flow->mega_ufid),
+        .match = match,
+        .actions = actions,
+        .actions_len = actions_len,
+        .stats = NULL,
+        .flow_reference = flow,
+        .cb_data = {
+            .callback = offload_flow_put_resume_cb,
+            .callback_aux = pmd->dp,
+        },
+    };
+    void *previous_flow_reference = NULL;
+    int error;
+
+    if (!dpif_offload_is_offload_enabled() || flow->dead
+        || !offload_queue_inc(flow)) {
+        return;
+    }
+
+    dp_netdev_flow_ref(flow);
+
+    error = dpif_offload_datapath_flow_put(pmd->dp->full_name, &put,
+                                           &previous_flow_reference);
+    offload_flow_put_resume(pmd->dp, put.flow_reference,
+                            previous_flow_reference,
+                            pmd->core_id, error);
+}
+
 static struct dp_netdev_flow *
 dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
                    struct match *match, const ovs_u128 *ufid,
@@ -4185,12 +3596,10 @@  dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     /* Do not allocate extra space. */
     flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
     memset(&flow->stats, 0, sizeof flow->stats);
-    atomic_init(&flow->netdev_flow_get_result, 0);
-    memset(&flow->last_stats, 0, sizeof flow->last_stats);
-    memset(&flow->last_attrs, 0, sizeof flow->last_attrs);
     flow->dead = false;
+    flow->offloaded = false;
+    atomic_init(&flow->offload_queue_depth, 0);
     flow->batch = NULL;
-    flow->mark = INVALID_FLOW_MARK;
     flow->orig_in_port = orig_in_port;
     *CONST_CAST(unsigned *, &flow->pmd_id) = pmd->core_id;
     *CONST_CAST(struct flow *, &flow->flow) = match->flow;
@@ -4225,8 +3634,7 @@  dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
         dp_netdev_simple_match_insert(pmd, flow);
     }
 
-    queue_netdev_flow_put(pmd, flow, match, actions, actions_len,
-                          DP_NETDEV_FLOW_OFFLOAD_OP_ADD);
+    offload_flow_put(pmd, flow, match, actions, actions_len);
     log_netdev_flow_change(flow, match, NULL, actions, actions_len);
 
     return flow;
@@ -4286,9 +3694,8 @@  flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
             old_actions = dp_netdev_flow_get_actions(netdev_flow);
             ovsrcu_set(&netdev_flow->actions, new_actions);
 
-            queue_netdev_flow_put(pmd, netdev_flow, match,
-                                  put->actions, put->actions_len,
-                                  DP_NETDEV_FLOW_OFFLOAD_OP_MOD);
+            offload_flow_put(pmd, netdev_flow, match, put->actions,
+                             put->actions_len);
             log_netdev_flow_change(netdev_flow, match, old_actions,
                                    put->actions, put->actions_len);
 
@@ -5141,6 +4548,11 @@  dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
         log_all_pmd_sleeps(dp);
     }
 
+    if (first_set_config) {
+        dpif_offload_datapath_register_flow_unreference_cb(
+            dpif, offload_flow_reference_unreference_cb);
+    }
+
     first_set_config = false;
     return 0;
 }
@@ -8329,35 +7741,32 @@  dp_netdev_hw_flow(const struct dp_netdev_pmd_thread *pmd,
                   struct dp_packet *packet,
                   struct dp_netdev_flow **flow)
 {
-    uint32_t mark;
-
-#ifdef ALLOW_EXPERIMENTAL_API /* Packet restoration API required. */
-    /* Restore the packet if HW processing was terminated before completion. */
     struct dp_netdev_rxq *rxq = pmd->ctx.last_rxq;
     bool postprocess_api_supported;
+    void *flow_reference = NULL;
+    int err;
 
     atomic_read_relaxed(&rxq->port->netdev->hw_info.postprocess_api_supported,
                         &postprocess_api_supported);
-    if (postprocess_api_supported) {
-        int err = dpif_offload_netdev_hw_miss_packet_postprocess(
-            rxq->port->netdev, packet);
 
-        if (err && err != EOPNOTSUPP) {
-            if (err != ECANCELED) {
-                COVERAGE_INC(datapath_drop_hw_miss_postprocess);
-            }
-            return -1;
-        }
-    }
-#endif
-
-    /* If no mark, no flow to find. */
-    if (!dp_packet_has_flow_mark(packet, &mark)) {
+    if (!postprocess_api_supported) {
         *flow = NULL;
         return 0;
     }
 
-    *flow = mark_to_flow_find(pmd, mark);
+    err = dpif_offload_netdev_hw_miss_packet_postprocess(rxq->port->netdev,
+                                                         pmd->core_id,
+                                                         packet,
+                                                         &flow_reference);
+
+    if (err && err != EOPNOTSUPP) {
+        if (err != ECANCELED) {
+            COVERAGE_INC(datapath_drop_hw_miss_postprocess);
+        }
+        return -1;
+    }
+
+    *flow = flow_reference;
     return 0;
 }
 
@@ -8413,7 +7822,7 @@  dfc_processing(struct dp_netdev_pmd_thread *pmd,
                size_t *n_flows, uint8_t *index_map,
                bool md_is_valid, odp_port_t port_no)
 {
-    const bool netdev_flow_api = dpif_offload_is_offload_enabled();
+    const bool offload_enabled = dpif_offload_is_offload_enabled();
     const uint32_t recirc_depth = *recirc_depth_get();
     const size_t cnt = dp_packet_batch_size(packets_);
     size_t n_missed = 0, n_emc_hit = 0, n_phwol_hit = 0;
@@ -8457,7 +7866,7 @@  dfc_processing(struct dp_netdev_pmd_thread *pmd,
             pkt_metadata_init(&packet->md, port_no);
         }
 
-        if (netdev_flow_api && recirc_depth == 0) {
+        if (offload_enabled && recirc_depth == 0) {
             if (OVS_UNLIKELY(dp_netdev_hw_flow(pmd, packet, &flow))) {
                 /* Packet restoration failed and it was dropped, do not
                  * continue processing.
@@ -10472,85 +9881,3 @@  dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
     }
     return false;
 }
-/* XXX: Temporarily duplicates definition in dpif-offload-dpdk.c. */
-#define DEFAULT_OFFLOAD_THREAD_NB 1
-static unsigned int offload_thread_nb = DEFAULT_OFFLOAD_THREAD_NB;
-
-DECLARE_EXTERN_PER_THREAD_DATA(unsigned int, dpdk_offload_thread_id);
-DEFINE_EXTERN_PER_THREAD_DATA(dpdk_offload_thread_id, OVSTHREAD_ID_UNSET);
-
-unsigned int
-dpdk_offload_thread_id(void)
-{
-    unsigned int id = *dpdk_offload_thread_id_get();
-
-    if (OVS_UNLIKELY(id == OVSTHREAD_ID_UNSET)) {
-        id = dpdk_offload_thread_init();
-    }
-
-    return id;
-}
-
-unsigned int
-dpdk_offload_thread_nb(void)
-{
-    return offload_thread_nb;
-}
-
-void
-dpdk_offload_thread_set_thread_nb(unsigned int thread_nb)
-{
-    offload_thread_nb = thread_nb;
-}
-
-static unsigned int
-dpdk_offload_ufid_to_thread_id(const ovs_u128 ufid)
-{
-    uint32_t ufid_hash;
-
-    if (dpdk_offload_thread_nb() == 1) {
-        return 0;
-    }
-
-    ufid_hash = hash_words64_inline(
-            (const uint64_t [2]){ ufid.u64.lo,
-                                  ufid.u64.hi }, 2, 1);
-    return ufid_hash % dpdk_offload_thread_nb();
-}
-
-static unsigned int
-dpdk_offload_thread_init(void)
-{
-    static atomic_count next_id = ATOMIC_COUNT_INIT(0);
-    bool thread_is_hw_offload;
-    bool thread_is_rcu;
-
-    thread_is_hw_offload = !strncmp(get_subprogram_name(),
-                                    "dpdk_offload", strlen("dpdk_offload"));
-    thread_is_rcu = !strncmp(get_subprogram_name(), "urcu", strlen("urcu"));
-
-    /* Panic if any other thread besides offload and RCU tries
-     * to initialize their thread ID. */
-    ovs_assert(thread_is_hw_offload || thread_is_rcu);
-
-    if (*dpdk_offload_thread_id_get() == OVSTHREAD_ID_UNSET) {
-        unsigned int id;
-
-        if (thread_is_rcu) {
-            /* RCU will compete with other threads for shared object access.
-             * Reclamation functions using a thread ID must be thread-safe.
-             * For that end, and because RCU must consider all potential shared
-             * objects anyway, its thread-id can be whichever, so return 0.
-             */
-            id = 0;
-        } else {
-            /* Only the actual offload threads have their own ID. */
-            id = atomic_count_inc(&next_id);
-        }
-        /* Panic if any offload thread is getting a spurious ID. */
-        ovs_assert(id < dpdk_offload_thread_nb());
-        return *dpdk_offload_thread_id_get() = id;
-    } else {
-        return *dpdk_offload_thread_id_get();
-    }
-}
diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
index 5df344367..6db6ed2e2 100644
--- a/lib/dpif-netdev.h
+++ b/lib/dpif-netdev.h
@@ -38,18 +38,6 @@  bool dpif_is_netdev(const struct dpif *);
 #define NR_QUEUE   1
 #define NR_PMD_THREADS 1
 
-/* Flow offloading info structure. XXX: This needs to be moved once the
- * implementation is migrated to dpif-offload-dpdk. */
-struct dpif_netdev_offload_info {
-    /*
-     * The flow mark id assigned to the flow. If any pkts hit the flow,
-     * it will be in the pkt meta data.
-     */
-    uint32_t flow_mark;
-
-    odp_port_t orig_in_port; /* Originating in_port for tnl flows. */
-};
-
 #ifdef  __cplusplus
 }
 #endif
diff --git a/lib/dpif-offload-dpdk-private.h b/lib/dpif-offload-dpdk-private.h
new file mode 100644
index 000000000..686c52420
--- /dev/null
+++ b/lib/dpif-offload-dpdk-private.h
@@ -0,0 +1,39 @@ 
+/*
+ * Copyright (c) 2025 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DPIF_OFFLOAD_DPDK_PRIVATE_H
+#define DPIF_OFFLOAD_DPDK_PRIVATE_H
+
+/* Forward declarations of private structures. */
+struct dpif_offload_dpdk;
+struct netdev;
+
+/* DPIF offload dpdk implementation-specific functions.  These should only be
+ * used by the associated netdev offload provider, i.e., netdev-offload-dpdk.
+ */
+unsigned int dpdk_offload_thread_id(void);
+void dpif_offload_dpdk_flow_unreference(struct dpif_offload_dpdk *offload,
+                                        unsigned pmd_id, void *flow_reference);
+uint32_t dpif_offload_dpdk_allocate_flow_mark(struct dpif_offload_dpdk *);
+void dpif_offload_dpdk_free_flow_mark(struct dpif_offload_dpdk *,
+                                      uint32_t flow_mark);
+struct netdev *dpif_offload_dpdk_get_netdev(
+    const struct dpif_offload_dpdk *, odp_port_t port_no);
+void dpif_offload_dpdk_traverse_ports(
+    const struct dpif_offload_dpdk *offload,
+    bool (*cb)(struct netdev *, odp_port_t, void *), void *aux);
+
+#endif /* DPIF_OFFLOAD_DPDK_PRIVATE_H */
diff --git a/lib/dpif-offload-dpdk.c b/lib/dpif-offload-dpdk.c
index 5c3e7f041..547bb68ef 100644
--- a/lib/dpif-offload-dpdk.c
+++ b/lib/dpif-offload-dpdk.c
@@ -19,28 +19,94 @@ 
 
 #include "dpif-offload.h"
 #include "dpif-offload-provider.h"
+#include "dpif-offload-dpdk-private.h"
+#include "id-fpool.h"
+#include "mov-avg.h"
+#include "mpsc-queue.h"
 #include "netdev-offload-dpdk.h"
 #include "netdev-provider.h"
 #include "netdev-vport.h"
 #include "util.h"
+#include "uuid.h"
 
 #include "openvswitch/json.h"
+#include "openvswitch/match.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_offload_dpdk);
 
-#define DEFAULT_OFFLOAD_THREAD_NB 1
-#define MAX_OFFLOAD_THREAD_NB 10
+#define DEFAULT_OFFLOAD_THREAD_COUNT 1
+#define MAX_OFFLOAD_THREAD_COUNT 10
 
-static unsigned int offload_thread_nb = DEFAULT_OFFLOAD_THREAD_NB;
+enum dpdk_offload_type {
+    DPDK_OFFLOAD_FLOW,
+    DPDK_OFFLOAD_FLUSH,
+};
+
+enum {
+    DPDK_NETDEV_FLOW_OFFLOAD_OP_PUT,
+    DPDK_NETDEV_FLOW_OFFLOAD_OP_DEL,
+};
+
+struct dpdk_offload_thread {
+    PADDED_MEMBERS(CACHE_LINE_SIZE,
+        struct mpsc_queue queue;
+        atomic_uint64_t enqueued_item;
+        struct mov_avg_cma cma;
+        struct mov_avg_ema ema;
+        struct dpif_offload_dpdk *offload;
+        pthread_t thread;
+    );
+};
+
+struct dpdk_offload_flow_item {
+    int op;
+    odp_port_t in_port;
+    ovs_u128 ufid;
+    unsigned pmd_id;
+    struct match match;
+    struct nlattr *actions;
+    size_t actions_len;
+    odp_port_t orig_in_port; /* Originating in_port for tunnel flows. */
+    bool requested_stats;
+    void *flow_reference;
+    struct dpif_offload_flow_cb_data callback;
+};
+
+struct dpdk_offload_flush_item {
+    struct netdev *netdev;
+    struct dpif_offload_dpdk *offload;
+    struct ovs_barrier *barrier;
+};
+
+union dpdk_offload_thread_data {
+    struct dpdk_offload_flow_item flow;
+    struct dpdk_offload_flush_item flush;
+};
+
+struct dpdk_offload_thread_item {
+    struct mpsc_queue_node node;
+    enum dpdk_offload_type type;
+    long long int timestamp;
+    union dpdk_offload_thread_data data[0];
+};
 
 /* dpif offload interface for the dpdk rte_flow implementation. */
 struct dpif_offload_dpdk {
     struct dpif_offload offload;
     struct dpif_offload_port_mgr *port_mgr;
 
+    atomic_count next_offload_thread_id;
+    atomic_bool offload_thread_shutdown;
+    struct dpdk_offload_thread *offload_threads;
+
+    struct id_fpool *flow_mark_pool;
+
+    dpif_offload_flow_unreference_cb *unreference_cb;
+
     /* Configuration specific variables. */
     struct ovsthread_once once_enable; /* Track first-time enablement. */
+    unsigned int offload_thread_count; /* Number of offload threads. */
 };
 
 static struct dpif_offload_dpdk *
@@ -50,13 +116,485 @@  dpif_offload_dpdk_cast(const struct dpif_offload *offload)
     return CONTAINER_OF(offload, struct dpif_offload_dpdk, offload);
 }
 
+DECLARE_EXTERN_PER_THREAD_DATA(unsigned int, dpdk_offload_thread_id);
+DEFINE_EXTERN_PER_THREAD_DATA(dpdk_offload_thread_id, OVSTHREAD_ID_UNSET);
+
+uint32_t
+dpif_offload_dpdk_allocate_flow_mark(struct dpif_offload_dpdk *offload)
+{
+    static struct ovsthread_once init_once = OVSTHREAD_ONCE_INITIALIZER;
+    unsigned int uid = dpdk_offload_thread_id() \
+                       % offload->offload_thread_count;
+    uint32_t flow_mark;
+
+    if (ovsthread_once_start(&init_once)) {
+        /* Haven't initiated yet, do it here. */
+        offload->flow_mark_pool = id_fpool_create(
+            offload->offload_thread_count, 1, UINT32_MAX - 1);
+        ovsthread_once_done(&init_once);
+    }
+
+    if (id_fpool_new_id(offload->flow_mark_pool, uid, &flow_mark)) {
+        return flow_mark;
+    }
+
+    return INVALID_FLOW_MARK;
+}
+
+void
+dpif_offload_dpdk_free_flow_mark(struct dpif_offload_dpdk *offload,
+                                 uint32_t flow_mark)
+{
+    if (flow_mark != INVALID_FLOW_MARK) {
+        unsigned int uid = dpdk_offload_thread_id() \
+                           % offload->offload_thread_count;
+
+        id_fpool_free_id(offload->flow_mark_pool, uid, flow_mark);
+    }
+}
+
+unsigned int
+dpdk_offload_thread_id(void)
+{
+    unsigned int id = *dpdk_offload_thread_id_get();
+
+    if (OVS_UNLIKELY(id == OVSTHREAD_ID_UNSET)) {
+        /* Offload threads get their ID set at initialization, here
+         * only the RCU thread might need initialization. */
+        ovs_assert(!strncmp(get_subprogram_name(), "urcu", strlen("urcu")));
+
+        /* RCU will compete with other threads for shared object access.
+         * Reclamation functions using a thread ID must be thread-safe.
+         * For that end, and because RCU must consider all potential shared
+         * objects anyway, its thread-id can be whichever, so return 0. */
+        id = 0;
+        *dpdk_offload_thread_id_get() = id;
+    }
+
+    return id;
+}
+
+static unsigned int
+dpif_offload_dpdk_ufid_to_thread_id(struct dpif_offload_dpdk *offload,
+                                    const ovs_u128 ufid)
+{
+    uint32_t ufid_hash;
+
+    if (offload->offload_thread_count == 1) {
+        return 0;
+    }
+
+    ufid_hash = hash_words64_inline(
+            (const uint64_t [2]){ ufid.u64.lo,
+                                  ufid.u64.hi }, 2, 1);
+    return ufid_hash % offload->offload_thread_count;
+}
+
+static bool
+dpif_offload_dpdk_is_offloading_netdev(struct dpif_offload_dpdk *offload,
+                                       struct netdev *netdev)
+{
+    const struct dpif_offload *netdev_offload;
+
+    netdev_offload = ovsrcu_get(const struct dpif_offload *,
+                                &netdev->dpif_offload);
+
+    return netdev_offload == &offload->offload;
+}
+
+static struct dpdk_offload_thread_item *
+dpif_offload_dpdk_alloc_flow_offload(int op)
+{
+    struct dpdk_offload_thread_item *item;
+    struct dpdk_offload_flow_item *flow_offload;
+
+    item = xzalloc(sizeof *item + sizeof *flow_offload);
+    flow_offload = &item->data->flow;
+
+    item->type = DPDK_OFFLOAD_FLOW;
+    flow_offload->op = op;
+
+    return item;
+}
+
+static void
+dpif_offload_dpdk_free_flow_offload__(struct dpdk_offload_thread_item *offload)
+{
+    struct dpdk_offload_flow_item *flow_offload = &offload->data->flow;
+
+    free(flow_offload->actions);
+    free(offload);
+}
+
+static void
+dpif_offload_dpdk_free_flow_offload(struct dpdk_offload_thread_item *offload)
+{
+    ovsrcu_postpone(dpif_offload_dpdk_free_flow_offload__, offload);
+}
+
+static void
+dpif_offload_dpdk_free_offload(struct dpdk_offload_thread_item *offload)
+{
+    switch (offload->type) {
+    case DPDK_OFFLOAD_FLOW:
+        dpif_offload_dpdk_free_flow_offload(offload);
+        break;
+    case DPDK_OFFLOAD_FLUSH:
+        free(offload);
+        break;
+    default:
+        OVS_NOT_REACHED();
+    };
+}
+
+static void
+dpif_offload_dpdk_append_offload(const struct dpif_offload_dpdk *offload,
+                                 struct dpdk_offload_thread_item *item,
+                                 unsigned int tid)
+{
+    ovs_assert(offload->offload_threads);
+
+    mpsc_queue_insert(&offload->offload_threads[tid].queue, &item->node);
+    atomic_count_inc64(&offload->offload_threads[tid].enqueued_item);
+}
+
+static void
+dpif_offload_dpdk_offload_flow_enqueue(struct dpif_offload_dpdk *offload,
+                                       struct dpdk_offload_thread_item *item)
+{
+    struct dpdk_offload_flow_item *flow_offload = &item->data->flow;
+    unsigned int tid;
+
+    ovs_assert(item->type == DPDK_OFFLOAD_FLOW);
+
+    tid = dpif_offload_dpdk_ufid_to_thread_id(offload, flow_offload->ufid);
+    dpif_offload_dpdk_append_offload(offload, item, tid);
+}
+
+static int
+dpif_offload_dpdk_offload_del(struct dpdk_offload_thread *thread,
+                              struct dpdk_offload_thread_item *item)
+{
+    struct dpdk_offload_flow_item *flow = &item->data->flow;
+    struct dpif_flow_stats stats;
+    struct netdev *netdev;
+    int error;
+
+    netdev = dpif_offload_dpdk_get_netdev(thread->offload, flow->in_port);
+
+    if (!netdev) {
+        VLOG_DBG("Failed to find netdev for port_id %d", flow->in_port);
+        error = ENODEV;
+        goto do_callback;
+    }
+
+    /* Note that we are responsible for tracking offloads per PMD. We pass
+     * the delete request directly to the dpdk netdev offload code, which
+     * will handle the actual hardware offloaded flow.  It will only remove it
+     * when no other PMD needs it. */
+    error = netdev_offload_dpdk_flow_del(thread->offload, netdev,
+                                         flow->pmd_id, &flow->ufid,
+                                         flow->flow_reference,
+                                         flow->requested_stats ? &stats
+                                                               : NULL);
+
+do_callback:
+    dpif_offload_datapath_flow_op_continue(&flow->callback,
+                                           flow->requested_stats ? &stats
+                                                                 : NULL,
+                                           flow->pmd_id, flow->flow_reference,
+                                           NULL,
+                                           error);
+    return error;
+}
+
+static int
+dpif_offload_dpdk_offload_put(struct dpdk_offload_thread *thread,
+                              struct dpdk_offload_thread_item *item)
+{
+    struct dpdk_offload_flow_item *flow = &item->data->flow;
+    void *previous_flow_reference = NULL;
+    struct dpif_flow_stats stats;
+    struct netdev *netdev;
+    int error;
+
+    netdev = dpif_offload_dpdk_get_netdev(thread->offload, flow->in_port);
+
+    if (!netdev) {
+        VLOG_DBG("Failed to find netdev for port_id %d", flow->in_port);
+        error = ENODEV;
+        goto do_callback;
+    }
+
+    if (!dpif_offload_dpdk_is_offloading_netdev(thread->offload, netdev)) {
+        error = EUNATCH;
+        goto do_callback;
+    }
+
+    /* Note that we are responsible for tracking offloads per PMD. We pass
+     * the put request directly to the dpdk netdev offload code, which
+     * will handle the actual hardware offloaded flow.  It will only add it
+     * when no other PMD have it offloaded. */
+    error = netdev_offload_dpdk_flow_put(thread->offload, flow->pmd_id,
+                                         flow->flow_reference,
+                                         netdev, &flow->match,
+                                         CONST_CAST(struct nlattr *,
+                                                    flow->actions),
+                                         flow->actions_len, &flow->ufid,
+                                         flow->orig_in_port,
+                                         &previous_flow_reference,
+                                         flow->requested_stats
+                                             ? &stats
+                                             : NULL);
+do_callback:
+    dpif_offload_datapath_flow_op_continue(&flow->callback,
+                                           flow->requested_stats ? &stats
+                                                                 : NULL,
+                                           flow->pmd_id, flow->flow_reference,
+                                           previous_flow_reference,
+                                           error);
+    return error;
+}
+
+static void
+dpif_offload_dpdk_offload_flow(struct dpdk_offload_thread *thread,
+                               struct dpdk_offload_thread_item *item)
+{
+    struct dpdk_offload_flow_item *flow_offload = &item->data->flow;
+    const char *op;
+    int ret;
+
+    switch (flow_offload->op) {
+    case DPDK_NETDEV_FLOW_OFFLOAD_OP_PUT:
+        op = "put";
+        ret = dpif_offload_dpdk_offload_put(thread, item);
+        break;
+    case DPDK_NETDEV_FLOW_OFFLOAD_OP_DEL:
+        op = "delete";
+        ret = dpif_offload_dpdk_offload_del(thread, item);
+        break;
+    default:
+        OVS_NOT_REACHED();
+    }
+
+    VLOG_DBG("%s to %s netdev flow "UUID_FMT,
+             ret == 0 ? "succeed" : "failed", op,
+             UUID_ARGS((struct uuid *) &flow_offload->ufid));
+}
+
+static void
+dpif_offload_dpdk_offload_flush(struct dpdk_offload_thread_item *item)
+{
+    struct dpdk_offload_flush_item *flush = &item->data->flush;
+
+    netdev_offload_dpdk_flow_flush(flush->offload, flush->netdev);
+    ovs_barrier_block(flush->barrier);
+}
+
+#define DPDK_OFFLOAD_BACKOFF_MIN 1
+#define DPDK_OFFLOAD_BACKOFF_MAX 64
+#define DPDK_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
+
+static void *
+dpif_offload_dpdk_offload_thread_main(void *arg)
+{
+    struct dpdk_offload_thread *ofl_thread = arg;
+    struct dpdk_offload_thread_item *offload;
+    struct mpsc_queue_node *node;
+    struct mpsc_queue *queue;
+    long long int latency_us;
+    long long int next_rcu;
+    uint64_t backoff;
+    bool exiting;
+
+    if (*dpdk_offload_thread_id_get() == OVSTHREAD_ID_UNSET) {
+        unsigned int id;
+
+        id = atomic_count_inc(&ofl_thread->offload->next_offload_thread_id);
+
+        /* Panic if any offload thread is getting a spurious ID. */
+        ovs_assert(id < ofl_thread->offload->offload_thread_count);
+
+        *dpdk_offload_thread_id_get() = id;
+    }
+
+    queue = &ofl_thread->queue;
+    mpsc_queue_acquire(queue);
+
+    do {
+        long long int now;
+
+        backoff = DPDK_OFFLOAD_BACKOFF_MIN;
+        while (mpsc_queue_tail(queue) == NULL) {
+            xnanosleep(backoff * 1E6);
+            if (backoff < DPDK_OFFLOAD_BACKOFF_MAX) {
+                backoff <<= 1;
+            }
+
+            atomic_read_relaxed(&ofl_thread->offload->offload_thread_shutdown,
+                                &exiting);
+            if (exiting) {
+                goto exit_thread;
+            }
+        }
+
+        now = time_usec();
+        next_rcu = now + DPDK_OFFLOAD_QUIESCE_INTERVAL_US;
+        MPSC_QUEUE_FOR_EACH_POP (node, queue) {
+            offload = CONTAINER_OF(node, struct dpdk_offload_thread_item,
+                                   node);
+            atomic_count_dec64(&ofl_thread->enqueued_item);
+
+            switch (offload->type) {
+            case DPDK_OFFLOAD_FLOW:
+                dpif_offload_dpdk_offload_flow(ofl_thread, offload);
+                break;
+            case DPDK_OFFLOAD_FLUSH:
+                dpif_offload_dpdk_offload_flush(offload);
+                break;
+            default:
+                OVS_NOT_REACHED();
+            }
+
+            now = time_usec();
+            latency_us = now - offload->timestamp;
+            mov_avg_cma_update(&ofl_thread->cma, latency_us);
+            mov_avg_ema_update(&ofl_thread->ema, latency_us);
+
+            dpif_offload_dpdk_free_offload(offload);
+
+            /* Do RCU synchronization at fixed interval. */
+            if (now > next_rcu) {
+                ovsrcu_quiesce();
+                next_rcu = time_usec() + DPDK_OFFLOAD_QUIESCE_INTERVAL_US;
+            }
+        }
+
+        atomic_read_relaxed(&ofl_thread->offload->offload_thread_shutdown,
+                            &exiting);
+    } while (!exiting);
+
+exit_thread:
+    mpsc_queue_release(queue);
+    return NULL;
+}
+
+static void
+dpif_offload_dpdk_offload_threads_init(struct dpif_offload_dpdk *offload)
+{
+    offload->offload_threads = xcalloc(offload->offload_thread_count,
+                                       sizeof(struct dpdk_offload_thread));
+
+    for (unsigned int tid = 0; tid < offload->offload_thread_count; tid++) {
+        struct dpdk_offload_thread *thread;
+
+        thread = &offload->offload_threads[tid];
+        mpsc_queue_init(&thread->queue);
+        atomic_init(&thread->enqueued_item, 0);
+        mov_avg_cma_init(&thread->cma);
+        mov_avg_ema_init(&thread->ema, 100);
+        thread->offload = offload;
+        thread->thread = ovs_thread_create(
+            "dpdk_offload", dpif_offload_dpdk_offload_thread_main, thread);
+    }
+}
+
+static long long int
+dpif_offload_dpdk_get_timestamp(void)
+{
+    /* XXX: We should look for a better, more efficient way to obtain a
+     *  timestamp in the fast path, if only used for gathering statistics. */
+    return time_usec();
+}
+
+static void
+dpif_offload_dpdk_flush_enqueue(struct dpif_offload_dpdk *offload,
+                                struct netdev *netdev,
+                                struct ovs_barrier *barrier)
+{
+    unsigned int tid;
+    long long int now_us = dpif_offload_dpdk_get_timestamp();
+
+    if (!dpif_offload_is_offload_enabled()) {
+        return;
+    }
+
+    for (tid = 0; tid < offload->offload_thread_count; tid++) {
+        struct dpdk_offload_thread_item *item;
+        struct dpdk_offload_flush_item *flush;
+
+        item = xmalloc(sizeof *item + sizeof *flush);
+        item->type = DPDK_OFFLOAD_FLUSH;
+        item->timestamp = now_us;
+
+        flush = &item->data->flush;
+        flush->netdev = netdev;
+        flush->offload = offload;
+        flush->barrier = barrier;
+
+        dpif_offload_dpdk_append_offload(offload, item, tid);
+    }
+}
+
+/* Blocking call that will wait on the offload thread to
+ * complete its work.  As the flush order will only be
+ * enqueued after existing offload requests, those previous
+ * offload requests must be processed.
+ *
+ * Flow offload flush is done when a port is being deleted.
+ * Right before this call executes, the offload API is disabled
+ * for the port. This call must be made blocking until the
+ * offload provider completed its job. */
+static void
+dpif_offload_dpdk_flush(struct dpif_offload_dpdk *offload,
+                        struct netdev *netdev)
+{
+    /* The flush mutex serves to exclude mutual access to the static
+     * barrier, and to prevent multiple flush orders to several threads.
+     *
+     * The memory barrier needs to go beyond the function scope as
+     * the other threads can resume from blocking after this function
+     * already finished.
+     *
+     * Additionally, because the flush operation is blocking, it would
+     * deadlock if multiple offload threads were blocking on several
+     * different barriers. Only allow a single flush order in the offload
+     * queue at a time. */
+    static struct ovs_mutex flush_mutex = OVS_MUTEX_INITIALIZER;
+    static struct ovs_barrier barrier OVS_GUARDED_BY(flush_mutex);
+
+    ovs_mutex_lock(&flush_mutex);
+
+    ovs_barrier_init(&barrier, 1 + offload->offload_thread_count);
+
+    dpif_offload_dpdk_flush_enqueue(offload, netdev, &barrier);
+    ovs_barrier_block(&barrier);
+    ovs_barrier_destroy(&barrier);
+
+    ovs_mutex_unlock(&flush_mutex);
+}
+
+void dpif_offload_dpdk_traverse_ports(
+    const struct dpif_offload_dpdk *offload,
+    bool (*cb)(struct netdev *, odp_port_t, void *), void *aux)
+{
+    struct dpif_offload_port_mgr_port *port;
+
+    DPIF_OFFLOAD_PORT_MGR_PORT_FOR_EACH (port, offload->port_mgr) {
+        if (cb(port->netdev, port->port_no, aux)) {
+            break;
+        }
+    }
+}
+
 static int
 dpif_offload_dpdk_enable_offload(struct dpif_offload *offload_,
                                  struct dpif_offload_port_mgr_port *port)
 {
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
     struct netdev *netdev = port->netdev;
 
-    netdev_offload_dpdk_init(netdev);
+    netdev_offload_dpdk_init(netdev, offload->offload_thread_count);
     dpif_offload_set_netdev_offload(netdev, offload_);
     return 0;
 }
@@ -98,6 +636,16 @@  dpif_offload_dpdk_port_del(struct dpif_offload *offload_, odp_port_t port_no)
     struct dpif_offload_port_mgr_port *port;
     int ret = 0;
 
+    port = dpif_offload_port_mgr_find_by_odp_port(offload->port_mgr, port_no);
+
+    if (dpif_offload_is_offload_enabled() && port) {
+        /* If hardware offload is enabled, we first need to flush (complete)
+         * all pending flow operations, especially the pending delete ones,
+         * before we remove the netdev from the port_mgr list. */
+        dpif_offload_set_netdev_offload(port->netdev, NULL);
+        dpif_offload_dpdk_flush(offload, port->netdev);
+    }
+
     port = dpif_offload_port_mgr_remove(offload->port_mgr, port_no, true);
     if (port) {
         if (dpif_offload_is_offload_enabled()) {
@@ -138,13 +686,14 @@  dpif_offload_dpdk_port_dump_done(const struct dpif_offload *offload_,
     return dpif_offload_port_mgr_port_dump_done(offload->port_mgr, state);
 }
 
-static struct netdev *
-dpif_offload_dpdk_get_netdev(struct dpif_offload *offload_, odp_port_t port_no)
+struct netdev *
+dpif_offload_dpdk_get_netdev(const struct dpif_offload_dpdk *offload,
+                             odp_port_t port_no)
 {
-    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
     struct dpif_offload_port_mgr_port *port;
 
-    port = dpif_offload_port_mgr_find_by_odp_port(offload->port_mgr, port_no);
+    port = dpif_offload_port_mgr_find_by_odp_port(offload->port_mgr,
+                                                  port_no);
     if (!port) {
         return NULL;
     }
@@ -152,6 +701,15 @@  dpif_offload_dpdk_get_netdev(struct dpif_offload *offload_, odp_port_t port_no)
     return port->netdev;
 }
 
+static struct netdev *
+dpif_offload_dpdk_get_netdev_(struct dpif_offload *offload_,
+                              odp_port_t port_no)
+{
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+
+    return dpif_offload_dpdk_get_netdev(offload, port_no);
+}
+
 static int
 dpif_offload_dpdk_open(const struct dpif_offload_class *offload_class,
                        struct dpif *dpif, struct dpif_offload **offload_)
@@ -163,6 +721,12 @@  dpif_offload_dpdk_open(const struct dpif_offload_class *offload_class,
     dpif_offload_init(&offload->offload, offload_class, dpif);
     offload->port_mgr = dpif_offload_port_mgr_init();
     offload->once_enable = (struct ovsthread_once) OVSTHREAD_ONCE_INITIALIZER;
+    offload->offload_thread_count = DEFAULT_OFFLOAD_THREAD_COUNT;
+    offload->offload_threads = NULL;
+    atomic_count_init(&offload->next_offload_thread_id, 0);
+    atomic_init(&offload->offload_thread_shutdown, false);
+    offload->flow_mark_pool = NULL;
+    offload->unreference_cb = NULL;
 
     *offload_ = &offload->offload;
     return 0;
@@ -187,7 +751,20 @@  dpif_offload_dpdk_close(struct dpif_offload *offload_)
                                          dpif_offload_dpdk_cleanup_port,
                                          offload_);
 
+    atomic_store_relaxed(&offload->offload_thread_shutdown, true);
+    if (offload->offload_threads) {
+        for (int i = 0; i < offload->offload_thread_count; i++) {
+            xpthread_join(offload->offload_threads[i].thread, NULL);
+            mpsc_queue_destroy(&offload->offload_threads[i].queue);
+        }
+        free(offload->offload_threads);
+    }
+
     dpif_offload_port_mgr_uninit(offload->port_mgr);
+    if (offload->flow_mark_pool) {
+        id_fpool_destroy(offload->flow_mark_pool);
+    }
+
     free(offload);
 }
 
@@ -200,9 +777,6 @@  static bool dpif_offload_dpdk_late_enable(struct dpif_offload_port_mgr_port *p,
     return false;
 }
 
-/* XXX: External reference, will be removed after full integration. */
-void dpdk_offload_thread_set_thread_nb(unsigned int thread_nb);
-
 static void
 dpif_offload_dpdk_set_config(struct dpif_offload *offload_,
                              const struct smap *other_cfg)
@@ -215,24 +789,22 @@  dpif_offload_dpdk_set_config(struct dpif_offload *offload_,
     if (smap_get_bool(other_cfg, "hw-offload", false)) {
         if (ovsthread_once_start(&offload->once_enable)) {
 
-            offload_thread_nb = smap_get_ullong(other_cfg,
-                                                "n-offload-threads",
-                                                DEFAULT_OFFLOAD_THREAD_NB);
-            if (offload_thread_nb == 0 ||
-                offload_thread_nb > MAX_OFFLOAD_THREAD_NB) {
+            unsigned int offload_thread_count = smap_get_uint(
+                other_cfg, "n-offload-threads", DEFAULT_OFFLOAD_THREAD_COUNT);
+
+            if (offload_thread_count == 0 ||
+                offload_thread_count > MAX_OFFLOAD_THREAD_COUNT) {
                 VLOG_WARN("netdev: Invalid number of threads requested: %u",
-                          offload_thread_nb);
-                offload_thread_nb = DEFAULT_OFFLOAD_THREAD_NB;
+                          offload_thread_count);
+                offload_thread_count = DEFAULT_OFFLOAD_THREAD_COUNT;
             }
 
-            if (smap_get(other_cfg, "n-offload-threads")) {
-                VLOG_INFO("Flow API using %u thread%s",
-                          offload_thread_nb,
-                          offload_thread_nb > 1 ? "s" : "");
-            }
+            VLOG_INFO("Flow API using %u thread%s", offload_thread_count,
+                      offload_thread_count > 1 ? "s" : "");
 
-            dpdk_offload_thread_set_thread_nb(offload_thread_nb);
+            offload->offload_thread_count = offload_thread_count;
 
+            dpif_offload_dpdk_offload_threads_init(offload);
             dpif_offload_port_mgr_traverse_ports(offload->port_mgr,
                                                  dpif_offload_dpdk_late_enable,
                                                  offload);
@@ -307,13 +879,22 @@  dpif_offload_dpdk_can_offload(struct dpif_offload *offload OVS_UNUSED,
     return netdev_dpdk_flow_api_supported(netdev, true);
 }
 
+struct get_n_offload_cb_aux {
+    uint64_t *total;
+    union {
+        unsigned int offload_thread_count;
+        unsigned int offload_thread_id;
+    };
+};
+
 static bool
-dpif_offload_dpdk_get_n_offloaded_cb(
-    struct dpif_offload_port_mgr_port *port, void *aux)
+dpif_offload_dpdk_get_n_offloaded_cb(struct dpif_offload_port_mgr_port *port,
+                                     void *aux_)
 {
-    uint64_t *total = aux;
+    struct get_n_offload_cb_aux *aux = aux_;
 
-    *total += netdev_offload_dpdk_flow_get_n_offloaded(port->netdev);
+    *aux->total += netdev_offload_dpdk_flow_get_n_offloaded(
+        port->netdev, aux->offload_thread_count);
     return false;
 }
 
@@ -323,29 +904,254 @@  dpif_offload_dpdk_get_n_offloaded(const struct dpif_offload *offload_)
     struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
     uint64_t total = 0;
 
+    struct get_n_offload_cb_aux aux = {
+        .offload_thread_count = offload->offload_thread_count,
+        .total = &total,
+    };
+
+    if (!dpif_offload_is_offload_enabled()) {
+        return 0;
+    }
+
+    dpif_offload_port_mgr_traverse_ports(offload->port_mgr,
+                                         dpif_offload_dpdk_get_n_offloaded_cb,
+                                         &aux);
+    return total;
+}
+
+static bool
+dpif_offload_dpdk_get_n_offloaded_by_thread_cb(
+    struct dpif_offload_port_mgr_port *port, void *aux_)
+{
+    struct get_n_offload_cb_aux *aux = aux_;
+
+    *aux->total += netdev_offload_dpdk_flow_get_n_offloaded_by_thread(
+        port->netdev, aux->offload_thread_id);
+    return false;
+}
+
+static uint64_t
+dpif_offload_dpdk_get_n_offloaded_by_thread(struct dpif_offload_dpdk *offload,
+                                            unsigned int tid)
+{
+    uint64_t total = 0;
+
+    struct get_n_offload_cb_aux aux = {
+        .offload_thread_id = tid,
+        .total = &total,
+    };
+
     if (!dpif_offload_is_offload_enabled()) {
         return 0;
     }
 
     dpif_offload_port_mgr_traverse_ports(
-        offload->port_mgr, dpif_offload_dpdk_get_n_offloaded_cb, &total);
+        offload->port_mgr,
+        dpif_offload_dpdk_get_n_offloaded_by_thread_cb,
+        &aux);
 
     return total;
 }
 
 static int
-dpif_offload_dpdk_netdev_flow_flush(const struct dpif_offload *offload
-                                    OVS_UNUSED, struct netdev *netdev)
+dpif_offload_dpdk_netdev_hw_miss_packet_postprocess(
+    const struct dpif_offload *offload_, struct netdev *netdev,
+    unsigned pmd_id, struct dp_packet *packet, void **flow_reference)
+{
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+
+    return netdev_offload_dpdk_hw_miss_packet_recover(offload, netdev, pmd_id,
+                                                      packet, flow_reference);
+}
+
+static int
+dpif_offload_dpdk_netdev_flow_put(const struct dpif_offload *offload_,
+                                  struct netdev *netdev OVS_UNUSED,
+                                  struct dpif_offload_flow_put *put,
+                                  void **previous_flow_reference OVS_UNUSED)
 {
-    return netdev_offload_dpdk_flow_flush(netdev);
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+    struct dpdk_offload_thread_item *item;
+    struct dpdk_offload_flow_item *flow_offload;
+
+    item = dpif_offload_dpdk_alloc_flow_offload(
+        DPDK_NETDEV_FLOW_OFFLOAD_OP_PUT);
+    item->timestamp = dpif_offload_dpdk_get_timestamp();
+
+    flow_offload = &item->data->flow;
+    flow_offload->in_port = put->in_port;
+    flow_offload->ufid = *put->ufid;
+    flow_offload->pmd_id = put->pmd_id;
+    flow_offload->flow_reference = put->flow_reference;
+    flow_offload->match = *put->match;
+    flow_offload->actions = xmalloc(put->actions_len);
+    nullable_memcpy(flow_offload->actions, put->actions, put->actions_len);
+    flow_offload->actions_len = put->actions_len;
+    flow_offload->orig_in_port = put->orig_in_port;
+    flow_offload->requested_stats = !!put->stats;
+    flow_offload->callback = put->cb_data;
+
+    dpif_offload_dpdk_offload_flow_enqueue(offload, item);
+    return EINPROGRESS;
 }
 
 static int
-dpif_offload_dpdk_netdev_hw_miss_packet_postprocess(
-    const struct dpif_offload *offload_ OVS_UNUSED, struct netdev *netdev,
-    struct dp_packet *packet)
+dpif_offload_dpdk_netdev_flow_del(const struct dpif_offload *offload_,
+                                  struct netdev *netdev OVS_UNUSED,
+                                  struct dpif_offload_flow_del *del)
+{
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+    struct dpdk_offload_thread_item *item;
+    struct dpdk_offload_flow_item *flow_offload;
+
+    item = dpif_offload_dpdk_alloc_flow_offload(
+        DPDK_NETDEV_FLOW_OFFLOAD_OP_DEL);
+    item->timestamp =dpif_offload_dpdk_get_timestamp();
+
+    flow_offload = &item->data->flow;
+    flow_offload->in_port = del->in_port;
+    flow_offload->requested_stats = !!del->stats;
+    flow_offload->ufid = *del->ufid;
+    flow_offload->flow_reference = del->flow_reference;
+    flow_offload->pmd_id = del->pmd_id;
+    flow_offload->callback = del->cb_data;
+
+    dpif_offload_dpdk_offload_flow_enqueue(offload, item);
+    return EINPROGRESS;
+}
+
+static void
+dpif_offload_dpdk_register_flow_unreference_cb(
+    const struct dpif_offload *offload_, dpif_offload_flow_unreference_cb *cb)
+{
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+
+    offload->unreference_cb = cb;
+}
+
+void
+dpif_offload_dpdk_flow_unreference(struct dpif_offload_dpdk *offload,
+                                   unsigned pmd_id, void *flow_reference)
+{
+    if (offload->unreference_cb) {
+        offload->unreference_cb(pmd_id, flow_reference);
+    }
+}
+
+static bool
+dpif_offload_dpdk_netdev_flow_stats(const struct dpif_offload *ol OVS_UNUSED,
+                                    struct netdev *netdev,
+                                    const ovs_u128 *ufid,
+                                    struct dpif_flow_stats *stats,
+                                    struct dpif_flow_attrs *attrs)
+{
+    uint64_t act_buf[1024 / 8];
+    struct nlattr *actions;
+    struct match match;
+    struct ofpbuf buf;
+
+    ofpbuf_use_stack(&buf, &act_buf, sizeof act_buf);
+
+    return !netdev_offload_dpdk_flow_get(netdev, &match, &actions,
+                                         ufid, stats, attrs, &buf);
+}
+
+static int
+dpif_offload_dpdk_get_global_stats(const struct dpif_offload *offload_,
+                                   struct netdev_custom_stats *stats)
 {
-    return netdev_offload_dpdk_hw_miss_packet_recover(netdev, packet);
+    struct dpif_offload_dpdk *offload = dpif_offload_dpdk_cast(offload_);
+    unsigned int nb_thread = offload->offload_thread_count;
+    struct dpdk_offload_thread *offload_threads = offload->offload_threads;
+    unsigned int tid;
+    size_t i;
+
+    enum {
+        DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED,
+        DP_NETDEV_HW_OFFLOADS_STATS_INSERTED,
+        DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN,
+        DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV,
+        DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN,
+        DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV,
+    };
+    struct {
+        const char *name;
+        uint64_t total;
+    } hwol_stats[] = {
+        [DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED] =
+            { "                Enqueued offloads", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] =
+            { "                Inserted offloads", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN] =
+            { "  Cumulative Average latency (us)", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV] =
+            { "   Cumulative Latency stddev (us)", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN] =
+            { " Exponential Average latency (us)", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV] =
+            { "  Exponential Latency stddev (us)", 0 },
+    };
+
+    if (!dpif_offload_is_offload_enabled() || !nb_thread) {
+        /* Leave stats structure untouched on error per API guidelines. */
+        return EINVAL;
+    }
+
+    stats->label = xstrdup(dpif_offload_name(offload_));
+
+    /* nb_thread counters for the overall total as well. */
+    stats->size = ARRAY_SIZE(hwol_stats) * (nb_thread + 1);
+    stats->counters = xcalloc(stats->size, sizeof *stats->counters);
+
+    for (tid = 0; tid < nb_thread; tid++) {
+        uint64_t counts[ARRAY_SIZE(hwol_stats)];
+        size_t idx = ((tid + 1) * ARRAY_SIZE(hwol_stats));
+
+        memset(counts, 0, sizeof counts);
+        if (offload_threads != NULL) {
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] =
+                dpif_offload_dpdk_get_n_offloaded_by_thread(offload, tid);
+
+            atomic_read_relaxed(&offload_threads[tid].enqueued_item,
+                                &counts[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED]);
+
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN] =
+                mov_avg_cma(&offload_threads[tid].cma);
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV] =
+                mov_avg_cma_std_dev(&offload_threads[tid].cma);
+
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN] =
+                mov_avg_ema(&offload_threads[tid].ema);
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV] =
+                mov_avg_ema_std_dev(&offload_threads[tid].ema);
+        }
+
+        for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
+            snprintf(stats->counters[idx + i].name,
+                     sizeof(stats->counters[idx + i].name),
+                     "  [%3u] %s", tid, hwol_stats[i].name);
+            stats->counters[idx + i].value = counts[i];
+            hwol_stats[i].total += counts[i];
+        }
+    }
+
+    /* Do an average of the average for the aggregate. */
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].total /= nb_thread;
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV].total /= nb_thread;
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN].total /= nb_thread;
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV].total /= nb_thread;
+
+    /* Get the total offload count. */
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].total =
+        dpif_offload_dpdk_get_n_offloaded(offload_);
+
+    for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
+        snprintf(stats->counters[i].name, sizeof(stats->counters[i].name),
+                 "  Total %s", hwol_stats[i].name);
+        stats->counters[i].value = hwol_stats[i].total;
+    }
+
+    return 0;
 }
 
 struct dpif_offload_class dpif_offload_dpdk_class = {
@@ -358,6 +1164,7 @@  struct dpif_offload_class dpif_offload_dpdk_class = {
     .close = dpif_offload_dpdk_close,
     .set_config = dpif_offload_dpdk_set_config,
     .get_debug = dpif_offload_dpdk_get_debug,
+    .get_global_stats = dpif_offload_dpdk_get_global_stats,
     .can_offload = dpif_offload_dpdk_can_offload,
     .port_add = dpif_offload_dpdk_port_add,
     .port_del = dpif_offload_dpdk_port_del,
@@ -365,8 +1172,12 @@  struct dpif_offload_class dpif_offload_dpdk_class = {
     .port_dump_next = dpif_offload_dpdk_port_dump_next,
     .port_dump_done = dpif_offload_dpdk_port_dump_done,
     .flow_get_n_offloaded = dpif_offload_dpdk_get_n_offloaded,
-    .get_netdev = dpif_offload_dpdk_get_netdev,
-    .netdev_flow_flush = dpif_offload_dpdk_netdev_flow_flush,
+    .get_netdev = dpif_offload_dpdk_get_netdev_,
     .netdev_hw_miss_packet_postprocess = \
         dpif_offload_dpdk_netdev_hw_miss_packet_postprocess,
+    .netdev_flow_put = dpif_offload_dpdk_netdev_flow_put,
+    .netdev_flow_del = dpif_offload_dpdk_netdev_flow_del,
+    .netdev_flow_stats = dpif_offload_dpdk_netdev_flow_stats,
+    .register_flow_unreference_cb = \
+        dpif_offload_dpdk_register_flow_unreference_cb,
 };
diff --git a/lib/dpif-offload-provider.h b/lib/dpif-offload-provider.h
index b5c4415c7..f071d1dbb 100644
--- a/lib/dpif-offload-provider.h
+++ b/lib/dpif-offload-provider.h
@@ -281,10 +281,6 @@  struct dpif_offload_class {
      * reasons.  They are intended for use in fast path processing and should
      * be designed with speed and efficiency in mind. */
 
-    /* Deletes all offloaded flows on this netdev.  Return 0 if successful,
-     *  otherwise returns a positive errno value. */
-    int (*netdev_flow_flush)(const struct dpif_offload *, struct netdev *);
-
     /* Recover and/or set the packet state (contents and metadata) for
      * continued processing in software, and perform any additional
      * post-processing required by the offload provider.
@@ -292,26 +288,32 @@  struct dpif_offload_class {
      * Return 0 if successful and the packet requires further processing;
      * otherwise, return a positive errno value and take ownership of the
      * packet if errno != EOPNOTSUPP.  Return ECANCELED if the packet was
-     * fully consumed by the provider for non-error conditions. */
+     * fully consumed by the provider for non-error conditions.
+     *
+     * When zero (0) is returned, the 'flow_reference' pointer may reference
+     * the flow_reference passed to the matching flow.  This can be used to
+     * support partial offloads.  The returned pointer must remain valid until
+     * the end of the next RCU grace period. */
     int (*netdev_hw_miss_packet_postprocess)(const struct dpif_offload *,
-                                             struct netdev *,
-                                             struct dp_packet *);
+                                             struct netdev *, unsigned pmd_id,
+                                             struct dp_packet *,
+                                             void **flow_reference);
 
     /* Add or modify the specified flow directly in the offload datapath.
      * The actual implementation may choose to handle the offload
      * asynchronously by returning EINPROGRESS and invoking the supplied
-     * 'callback' once completed.  For successful synchronous handling, the
+     * 'callback' once completed.  If the flow is handled asynchronously, the
+     * order should be guaranteed.  For successful synchronous handling, the
      * callback must not be called, and 0 should be returned.  If this call is
      * not successful, a positive errno value should be returned. */
     int (*netdev_flow_put)(const struct dpif_offload *, struct netdev *,
                            struct dpif_offload_flow_put *,
-                           uint32_t *flow_mark);
+                           void **previous_flow_reference);
 
     /* Delete the specified flow directly from the offloaded datapath.  See the
      * above 'netdev_flow_put' for implementation details. */
     int (*netdev_flow_del)(const struct dpif_offload *, struct netdev *,
-                           struct dpif_offload_flow_del *,
-                           uint32_t *flow_mark);
+                           struct dpif_offload_flow_del *);
 
     /* Get offload statistics based on the flows 'ufid'.  Note that this API
      * does NOT support asynchronous handling.  Returns 'true' if the flow was
@@ -321,6 +323,12 @@  struct dpif_offload_class {
                               const ovs_u128 *ufid,
                               struct dpif_flow_stats *stats,
                               struct dpif_flow_attrs *attrs);
+
+    /* Registers a callback that is invoked when a flow reference is released
+     * by the offload provider, i.e., when the flow reference previously passed
+     * to netdev_flow_put() is no longer held by the offload provider. */
+    void (*register_flow_unreference_cb)(const struct dpif_offload *,
+                                         dpif_offload_flow_unreference_cb *);
 };
 
 extern struct dpif_offload_class dpif_offload_dummy_class;
diff --git a/lib/dpif-offload-tc.c b/lib/dpif-offload-tc.c
index 259e87029..faca54408 100644
--- a/lib/dpif-offload-tc.c
+++ b/lib/dpif-offload-tc.c
@@ -330,13 +330,6 @@  dpif_offload_tc_netdev_flow_flush_(struct netdev *netdev)
     return netdev_offload_tc_flow_flush(netdev);
 }
 
-static int
-dpif_offload_tc_netdev_flow_flush(const struct dpif_offload *offload
-                                  OVS_UNUSED, struct netdev *netdev)
-{
-    return dpif_offload_tc_netdev_flow_flush_(netdev);
-}
-
 static bool
 dpif_offload_tc_flow_flush_cb(struct dpif_offload_port_mgr_port *port,
                               void *aux)
@@ -877,5 +870,4 @@  struct dpif_offload_class dpif_offload_tc_class = {
     .meter_get = dpif_offload_tc_meter_get,
     .meter_del = dpif_offload_tc_meter_del,
     .get_netdev = dpif_offload_tc_get_netdev,
-    .netdev_flow_flush = dpif_offload_tc_netdev_flow_flush,
 };
diff --git a/lib/dpif-offload.c b/lib/dpif-offload.c
index 666382730..3a14ab92f 100644
--- a/lib/dpif-offload.c
+++ b/lib/dpif-offload.c
@@ -1381,23 +1381,10 @@  dpif_offload_netdev_same_offload(const struct netdev *a,
     return offload_a == offload_b;
 }
 
-int
-dpif_offload_netdev_flush_flows(struct netdev *netdev)
-{
-    const struct dpif_offload *offload;
-
-    offload = ovsrcu_get(const struct dpif_offload *, &netdev->dpif_offload);
-
-    if (offload && offload->class->netdev_flow_flush) {
-        return offload->class->netdev_flow_flush(offload, netdev);
-    }
-    return EOPNOTSUPP;
-}
-
 int
 dpif_offload_datapath_flow_put(const char *dpif_name,
                                struct dpif_offload_flow_put *put,
-                               uint32_t *flow_mark)
+                               void **previous_flow_reference)
 {
     struct dpif_offload *offload;
     struct dp_offload *dp_offload;
@@ -1409,10 +1396,8 @@  dpif_offload_datapath_flow_put(const char *dpif_name,
     ovs_mutex_unlock(&dpif_offload_mutex);
 
     if (OVS_UNLIKELY(!dp_offload)) {
-        if (flow_mark) {
-            *flow_mark = INVALID_FLOW_MARK;
-        }
-        return 0;
+        *previous_flow_reference = NULL;
+        return EOPNOTSUPP;
     }
 
     netdev = dpif_offload_get_netdev_by_port_id_(dp_offload, &offload,
@@ -1420,19 +1405,16 @@  dpif_offload_datapath_flow_put(const char *dpif_name,
 
     if (OVS_LIKELY(netdev && offload->class->netdev_flow_put)) {
         return offload->class->netdev_flow_put(offload, netdev, put,
-                                               flow_mark);
+                                               previous_flow_reference);
     }
 
-    if (flow_mark) {
-        *flow_mark = INVALID_FLOW_MARK;
-    }
-    return 0;
+    *previous_flow_reference = NULL;
+    return EOPNOTSUPP;
 }
 
 int
 dpif_offload_datapath_flow_del(const char *dpif_name,
-                               struct dpif_offload_flow_del *del,
-                               uint32_t *flow_mark)
+                               struct dpif_offload_flow_del *del)
 {
     struct dpif_offload *offload;
     struct dp_offload *dp_offload;
@@ -1444,24 +1426,17 @@  dpif_offload_datapath_flow_del(const char *dpif_name,
     ovs_mutex_unlock(&dpif_offload_mutex);
 
     if (OVS_UNLIKELY(!dp_offload)) {
-        if (flow_mark) {
-            *flow_mark = INVALID_FLOW_MARK;
-        }
-        return 0;
+        return EOPNOTSUPP;
     }
 
     netdev = dpif_offload_get_netdev_by_port_id_(dp_offload, &offload,
                                                  del->in_port);
 
     if (OVS_LIKELY(netdev && offload->class->netdev_flow_del)) {
-        return offload->class->netdev_flow_del(offload, netdev, del,
-                                               flow_mark);
+        return offload->class->netdev_flow_del(offload, netdev, del);
     }
 
-    if (flow_mark) {
-        *flow_mark = INVALID_FLOW_MARK;
-    }
-    return 0;
+    return EOPNOTSUPP;
 }
 
 bool
@@ -1499,7 +1474,9 @@  dpif_offload_datapath_flow_stats(const char *dpif_name, odp_port_t in_port,
 
 int
 dpif_offload_netdev_hw_miss_packet_postprocess(struct netdev *netdev,
-                                               struct dp_packet *packet)
+                                               unsigned pmd_id,
+                                               struct dp_packet *packet,
+                                               void **flow_reference)
 {
     const struct dpif_offload *offload;
     bool postprocess_api_supported;
@@ -1524,7 +1501,8 @@  dpif_offload_netdev_hw_miss_packet_postprocess(struct netdev *netdev,
     }
 
     rc = offload->class->netdev_hw_miss_packet_postprocess(offload, netdev,
-                                                           packet);
+                                                           pmd_id, packet,
+                                                           flow_reference);
     if (rc == EOPNOTSUPP) {
         /* API unsupported by the port; avoid subsequent calls. */
         atomic_store_relaxed(&netdev->hw_info.postprocess_api_supported,
@@ -1533,6 +1511,24 @@  dpif_offload_netdev_hw_miss_packet_postprocess(struct netdev *netdev,
     return rc;
 }
 
+void
+dpif_offload_datapath_register_flow_unreference_cb(
+    struct dpif *dpif, dpif_offload_flow_unreference_cb *cb)
+{
+    struct dp_offload *dp_offload = dpif_offload_get_dp_offload(dpif);
+    const struct dpif_offload *offload;
+
+    /* In this case, we assert to make sure this initialization is done after
+     * the offload providers have been assigned to the dpif. */
+    ovs_assert(dp_offload);
+
+    LIST_FOR_EACH (offload, dpif_list_node, &dp_offload->offload_providers) {
+        if (offload->class->register_flow_unreference_cb) {
+            offload->class->register_flow_unreference_cb(offload, cb);
+        }
+    }
+}
+
 
 struct dpif_offload_port_mgr *
 dpif_offload_port_mgr_init(void)
@@ -1778,57 +1774,3 @@  dpif_offload_port_mgr_port_dump_done(
     free(state);
     return 0;
 }
-
-/* XXX: Temporary functions below, which will be removed once fully
- *      refactored. */
-struct netdev *dpif_netdev_offload_get_netdev_by_port_id(odp_port_t);
-void dpif_netdev_offload_ports_traverse(
-    bool (*cb)(struct netdev *, odp_port_t, void *), void *aux);
-
-struct netdev *
-dpif_netdev_offload_get_netdev_by_port_id(odp_port_t port_no)
-{
-    struct dp_offload *dp_offload;
-    struct dpif dpif;
-
-    ovs_mutex_lock(&dpif_offload_mutex);
-    dp_offload = shash_find_data(&dpif_offload_providers, "netdev@ovs-netdev");
-    ovs_mutex_unlock(&dpif_offload_mutex);
-
-    if (!dp_offload) {
-        return NULL;
-    }
-
-    memset(&dpif, 0, sizeof dpif);
-    ovsrcu_set(&dpif.dp_offload, dp_offload);
-
-    return dpif_offload_get_netdev_by_port_id(&dpif, NULL, port_no);
-}
-
-void
-dpif_netdev_offload_ports_traverse(
-    bool (*cb)(struct netdev *, odp_port_t, void *), void *aux)
-{
-    struct dpif_offload_port_dump dump;
-    struct dp_offload *dp_offload;
-    struct dpif_offload_port port;
-    struct dpif dpif;
-
-    ovs_mutex_lock(&dpif_offload_mutex);
-    dp_offload = shash_find_data(&dpif_offload_providers, "netdev@ovs-netdev");
-    ovs_mutex_unlock(&dpif_offload_mutex);
-
-    if (!dp_offload) {
-        return;
-    }
-
-    memset(&dpif, 0, sizeof dpif);
-    ovsrcu_set(&dpif.dp_offload, dp_offload);
-
-    DPIF_OFFLOAD_PORT_FOR_EACH (&port, &dump, &dpif) {
-        if (cb(port.netdev, port.port_no, aux)) {
-            dpif_offload_port_dump_done(&dump);
-            break;
-        }
-    }
-}
diff --git a/lib/dpif-offload.h b/lib/dpif-offload.h
index 868294474..542db3a94 100644
--- a/lib/dpif-offload.h
+++ b/lib/dpif-offload.h
@@ -150,28 +150,33 @@  int dpif_offload_stats_get(struct dpif *, struct netdev_custom_stats **stats,
 /* Netdev specific function, which can be used in the fast path. */
 bool dpif_offload_netdev_same_offload(const struct netdev *,
                                       const struct netdev *);
-int dpif_offload_netdev_flush_flows(struct netdev *);
 int dpif_offload_netdev_hw_miss_packet_postprocess(struct netdev *,
-                                                   struct dp_packet *);
+                                                   unsigned pmd_id,
+                                                   struct dp_packet *,
+                                                   void **flow_reference);
 
-
-/* Flow modification callback definitions. */
-typedef void dpif_offload_flow_op_cb(void *aux_dp, void *aux_flow,
-                                     struct dpif_flow_stats *stats,
-                                     uint32_t flow_mark, int error);
+/* Flow modification callback definition. */
+typedef void dpif_offload_flow_op_cb(void *aux, struct dpif_flow_stats *stats,
+                                     unsigned pmd_id, void *flow_reference,
+                                     void *old_flow_reference,
+                                     int error);
+
+/* Release flow reference callback definition. */
+typedef void dpif_offload_flow_unreference_cb(unsigned pmd_id,
+                                              void *flow_reference);
 
 /* Supporting structures for flow modification functions. */
 struct dpif_offload_flow_cb_data {
     dpif_offload_flow_op_cb *callback;
-    void *callback_aux_dp;
-    void *callback_aux_flow;
+    void *callback_aux;
 };
 
 struct dpif_offload_flow_put {
-    bool modify;
     odp_port_t in_port;
     odp_port_t orig_in_port;  /* Originating in_port for tunneled packets. */
+    unsigned pmd_id;
     const ovs_u128 *ufid;
+    void *flow_reference;
     struct match *match;
     const struct nlattr *actions;
     size_t actions_len;
@@ -181,7 +186,9 @@  struct dpif_offload_flow_put {
 
 struct dpif_offload_flow_del {
     odp_port_t in_port;
+    unsigned pmd_id;
     const ovs_u128 *ufid;
+    void *flow_reference;
     struct dpif_flow_stats *stats;
     struct dpif_offload_flow_cb_data cb_data;
 };
@@ -189,22 +196,25 @@  struct dpif_offload_flow_del {
 /* Flow modification functions, which can be used in the fast path. */
 int dpif_offload_datapath_flow_put(const char *dpif_name,
                                    struct dpif_offload_flow_put *,
-                                   uint32_t *flow_mark);
+                                   void **previous_flow_reference);
 int dpif_offload_datapath_flow_del(const char *dpif_name,
-                                   struct dpif_offload_flow_del *,
-                                   uint32_t *flow_mark);
+                                   struct dpif_offload_flow_del *);
 bool dpif_offload_datapath_flow_stats(const char *dpif_name,
                                       odp_port_t in_port, const ovs_u128 *ufid,
                                       struct dpif_flow_stats *,
                                       struct dpif_flow_attrs *);
+void dpif_offload_datapath_register_flow_unreference_cb(
+    struct dpif *, dpif_offload_flow_unreference_cb *);
 
 static inline void dpif_offload_datapath_flow_op_continue(
     struct dpif_offload_flow_cb_data *cb, struct dpif_flow_stats *stats,
-    uint32_t flow_mark, int error)
+    unsigned pmd_id, void *flow_reference, void *old_flow_reference,
+    int error)
 {
     if (cb && cb->callback) {
-        cb->callback(cb->callback_aux_dp, cb->callback_aux_flow,
-                      stats, flow_mark, error);
+
+        cb->callback(cb->callback_aux, stats, pmd_id, flow_reference,
+                     old_flow_reference, error);
     }
 }
 
diff --git a/lib/netdev-offload-dpdk.c b/lib/netdev-offload-dpdk.c
index de10a71c4..d0d3bbe66 100644
--- a/lib/netdev-offload-dpdk.c
+++ b/lib/netdev-offload-dpdk.c
@@ -26,6 +26,7 @@ 
 #include "cmap.h"
 #include "dpif-netdev.h"
 #include "dpif-offload.h"
+#include "dpif-offload-dpdk-private.h"
 #include "netdev-offload-dpdk.h"
 #include "netdev-provider.h"
 #include "netdev-vport.h"
@@ -39,13 +40,6 @@ 
 VLOG_DEFINE_THIS_MODULE(netdev_offload_dpdk);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
 
-/* XXX: Temporarily external declarations, will be removed during cleanup. */
-unsigned int dpdk_offload_thread_nb(void);
-unsigned int dpdk_offload_thread_id(void);
-struct netdev *dpif_netdev_offload_get_netdev_by_port_id(odp_port_t);
-void dpif_netdev_offload_ports_traverse(
-    bool (*cb)(struct netdev *, odp_port_t, void *), void *aux);
-
 /* Thread-safety
  * =============
  *
@@ -61,37 +55,202 @@  void dpif_netdev_offload_ports_traverse(
  * read-locking the datapath 'port_rwlock' in lib/dpif-netdev.c.  */
 
 /*
- * A mapping from ufid to dpdk rte_flow.
+ * A mapping from pmd_id to flow_reference.
  */
+struct pmd_id_to_flow_ref_data {
+    struct cmap_node node;
+    void *flow_reference;
+    unsigned pmd_id;
+};
 
+struct pmd_data {
+    struct cmap pmd_id_to_flow_ref;
+    struct ovs_mutex map_lock;
+};
+
+/*
+ * A mapping from ufid to dpdk rte_flow.
+ */
 struct ufid_to_rte_flow_data {
     struct cmap_node node;
+    struct cmap_node mark_node;
     ovs_u128 ufid;
     struct netdev *netdev;
     struct rte_flow *rte_flow;
+    OVSRCU_TYPE(struct pmd_data *) pmd_mapping;
     bool actions_offloaded;
     struct dpif_flow_stats stats;
     struct netdev *physdev;
     struct ovs_mutex lock;
     unsigned int creation_tid;
+    uint32_t flow_mark;
     bool dead;
 };
 
 struct netdev_offload_dpdk_data {
     struct cmap ufid_to_rte_flow;
+    struct cmap mark_to_rte_flow;
     uint64_t *rte_flow_counters;
     struct ovs_mutex map_lock;
 };
 
+static struct pmd_data *
+netdev_offload_dpdk_pmd_data_init(void)
+{
+    struct pmd_data *mapping = xmalloc(sizeof *mapping);
+
+    ovs_mutex_init(&mapping->map_lock);
+    cmap_init(&mapping->pmd_id_to_flow_ref);
+    return mapping;
+}
+
+static void
+netdev_offload_dpdk_pmd_data_associate(struct pmd_data *mapping,
+                                       unsigned pmd_id, void *flow_reference)
+{
+    struct pmd_id_to_flow_ref_data *pmd_data = xmalloc(sizeof *pmd_data);
+
+    pmd_data->flow_reference = flow_reference;
+    pmd_data->pmd_id = pmd_id;
+
+    ovs_mutex_lock(&mapping->map_lock);
+    cmap_insert(&mapping->pmd_id_to_flow_ref, &pmd_data->node,
+                hash_int(pmd_id, 0));
+    ovs_mutex_unlock(&mapping->map_lock);
+}
+
+static void
+netdev_offload_dpdk_pmd_data_disassociate(struct pmd_data *mapping,
+                                          unsigned pmd_id)
+{
+    struct pmd_id_to_flow_ref_data *data;
+    size_t hash = hash_int(pmd_id, 0);
+
+    ovs_mutex_lock(&mapping->map_lock);
+
+    CMAP_FOR_EACH_WITH_HASH_PROTECTED (data, node, hash,
+                                       &mapping->pmd_id_to_flow_ref) {
+        if (data->pmd_id == pmd_id) {
+            cmap_remove(&mapping->pmd_id_to_flow_ref, &data->node, hash);
+            ovsrcu_postpone(free, data);
+            break;
+        }
+    }
+
+    ovs_mutex_unlock(&mapping->map_lock);
+}
+
+static struct pmd_id_to_flow_ref_data*
+netdev_offload_dpdk_pmd_data_get_data(
+    const struct ufid_to_rte_flow_data *flow_data, unsigned pmd_id)
+{
+    struct pmd_id_to_flow_ref_data *data;
+    size_t hash = hash_int(pmd_id, 0);
+    struct pmd_data *mapping;
+
+    mapping = ovsrcu_get(struct pmd_data *, &flow_data->pmd_mapping);
+    if (!mapping) {
+        return NULL;
+    }
+
+    CMAP_FOR_EACH_WITH_HASH (data, node, hash,
+                             &mapping->pmd_id_to_flow_ref) {
+        if (data->pmd_id == pmd_id) {
+            return data;
+        }
+    }
+    return NULL;
+}
+
+static bool
+netdev_offload_dpdk_pmd_data_pmd_update(
+    const struct ufid_to_rte_flow_data *flow_data, unsigned pmd_id,
+    void *new_flow_reference, void **previous_flow_reference)
+{
+    struct pmd_id_to_flow_ref_data *data;
+
+    data = netdev_offload_dpdk_pmd_data_get_data(flow_data, pmd_id);
+    if (!data) {
+        return false;
+    }
+
+    *previous_flow_reference = data->flow_reference;
+    data->flow_reference = new_flow_reference;
+    return true;
+}
+
+static bool
+netdev_offload_dpdk_pmd_data_find_pmd_and_delete(
+    const struct ufid_to_rte_flow_data *flow_data, unsigned pmd_id,
+    void *flow_reference)
+{
+    struct pmd_data *mapping = ovsrcu_get(struct pmd_data *,
+                                          &flow_data->pmd_mapping);
+    struct pmd_id_to_flow_ref_data *data;
+    size_t hash = hash_int(pmd_id, 0);
+
+    ovs_assert(mapping);
+    ovs_mutex_lock(&mapping->map_lock);
+
+    CMAP_FOR_EACH_WITH_HASH_PROTECTED (data, node, hash,
+                                       &mapping->pmd_id_to_flow_ref) {
+        if (data->pmd_id == pmd_id && data->flow_reference == flow_reference) {
+            cmap_remove(&mapping->pmd_id_to_flow_ref, &data->node, hash);
+            ovsrcu_postpone(free, data);
+
+            ovs_mutex_unlock(&mapping->map_lock);
+            return true;
+        }
+    }
+
+    ovs_mutex_unlock(&mapping->map_lock);
+    return false;
+}
+
+static void
+netdev_offload_dpdk_pmd_data_cleanup_mappings(
+    struct dpif_offload_dpdk *offload, struct pmd_data *mapping)
+{
+    struct pmd_id_to_flow_ref_data *data;
+
+    if (!mapping) {
+        return;
+    }
+
+    ovs_mutex_lock(&mapping->map_lock);
+
+    CMAP_FOR_EACH (data, node, &mapping->pmd_id_to_flow_ref) {
+        cmap_remove(&mapping->pmd_id_to_flow_ref, &data->node,
+                    hash_int(data->pmd_id, 0));
+
+        dpif_offload_dpdk_flow_unreference(offload, data->pmd_id,
+                                           data->flow_reference);
+        ovsrcu_postpone(free, data);
+    }
+
+    ovs_mutex_unlock(&mapping->map_lock);
+}
+
+static void
+netdev_offload_dpdk_pmd_data_cleanup_mapping(struct pmd_data *mapping)
+{
+    if (mapping) {
+        ovs_mutex_destroy(&mapping->map_lock);
+        cmap_destroy(&mapping->pmd_id_to_flow_ref);
+        free(mapping);
+    }
+}
+
 static int
-offload_data_init(struct netdev *netdev)
+offload_data_init(struct netdev *netdev, unsigned int offload_thread_count)
 {
     struct netdev_offload_dpdk_data *data;
 
     data = xzalloc(sizeof *data);
     ovs_mutex_init(&data->map_lock);
     cmap_init(&data->ufid_to_rte_flow);
-    data->rte_flow_counters = xcalloc(dpdk_offload_thread_nb(),
+    cmap_init(&data->mark_to_rte_flow);
+    data->rte_flow_counters = xcalloc(offload_thread_count,
                                       sizeof *data->rte_flow_counters);
 
     ovsrcu_set(&netdev->hw_info.offload_data, (void *) data);
@@ -130,6 +289,7 @@  offload_data_destroy(struct netdev *netdev)
     }
 
     cmap_destroy(&data->ufid_to_rte_flow);
+    cmap_destroy(&data->mark_to_rte_flow);
     ovsrcu_postpone(offload_data_destroy__, data);
 
     ovsrcu_set(&netdev->hw_info.offload_data, NULL);
@@ -174,10 +334,38 @@  offload_data_map(struct netdev *netdev)
     return data ? &data->ufid_to_rte_flow : NULL;
 }
 
+static bool
+offload_data_maps(struct netdev *netdev, struct cmap **ufid_map,
+                  struct cmap **mark_map)
+{
+    struct netdev_offload_dpdk_data *data;
+
+    data = (struct netdev_offload_dpdk_data *)
+        ovsrcu_get(void *, &netdev->hw_info.offload_data);
+
+    if (!data) {
+        return false;
+    }
+
+    *ufid_map = &data->ufid_to_rte_flow;
+    *mark_map = &data->mark_to_rte_flow;
+    return true;
+}
+
+static struct cmap *
+offload_data_mark_map(struct netdev *netdev)
+{
+    struct netdev_offload_dpdk_data *data;
+
+    data = (struct netdev_offload_dpdk_data *)
+        ovsrcu_get(void *, &netdev->hw_info.offload_data);
+
+    return data ? &data->mark_to_rte_flow : NULL;
+}
+
 /* Find rte_flow with @ufid. */
 static struct ufid_to_rte_flow_data *
-ufid_to_rte_flow_data_find(struct netdev *netdev,
-                           const ovs_u128 *ufid, bool warn)
+ufid_to_rte_flow_data_find(struct netdev *netdev, const ovs_u128 *ufid)
 {
     size_t hash = hash_bytes(ufid, sizeof *ufid, 0);
     struct ufid_to_rte_flow_data *data;
@@ -193,12 +381,26 @@  ufid_to_rte_flow_data_find(struct netdev *netdev,
         }
     }
 
-    if (warn) {
-        VLOG_WARN("ufid "UUID_FMT" is not associated with an rte flow",
-                  UUID_ARGS((struct uuid *) ufid));
+    return NULL;
+}
+
+static struct ufid_to_rte_flow_data *
+ufid_to_rte_flow_data_find_pmd_and_update(struct netdev *netdev,
+                                          const ovs_u128 *ufid,
+                                          unsigned pmd_id, bool *found_pmd,
+                                          void *new_flow_reference,
+                                          void **previous_flow_reference)
+{
+    struct ufid_to_rte_flow_data *data = ufid_to_rte_flow_data_find(netdev,
+                                                                    ufid);
+    if (data) {
+        *found_pmd = netdev_offload_dpdk_pmd_data_pmd_update(
+            data, pmd_id, new_flow_reference, previous_flow_reference);
+    } else {
+        *found_pmd = false;
     }
 
-    return NULL;
+    return data;
 }
 
 /* Find rte_flow with @ufid, lock-protected. */
@@ -219,17 +421,38 @@  ufid_to_rte_flow_data_find_protected(struct netdev *netdev,
     return NULL;
 }
 
+/* Find rte_flow with @flow_mark. */
+static struct ufid_to_rte_flow_data *
+mark_to_rte_flow_data_find(struct netdev *netdev, uint32_t flow_mark)
+{
+    size_t hash = hash_int(flow_mark, 0);
+    struct ufid_to_rte_flow_data *data;
+    struct cmap *mark_map = offload_data_mark_map(netdev);
+
+    if (!mark_map) {
+        return NULL;
+    }
+
+    CMAP_FOR_EACH_WITH_HASH (data, mark_node, hash, mark_map) {
+        if (data->flow_mark == flow_mark) {
+            return data;
+        }
+    }
+    return NULL;
+}
+
 static inline struct ufid_to_rte_flow_data *
 ufid_to_rte_flow_associate(const ovs_u128 *ufid, struct netdev *netdev,
                            struct netdev *physdev, struct rte_flow *rte_flow,
-                           bool actions_offloaded)
+                           bool actions_offloaded, uint32_t flow_mark,
+                           struct pmd_data *pmd_mapping)
 {
     size_t hash = hash_bytes(ufid, sizeof *ufid, 0);
-    struct cmap *map = offload_data_map(netdev);
     struct ufid_to_rte_flow_data *data_prev;
     struct ufid_to_rte_flow_data *data;
+    struct cmap *map, *mark_map;
 
-    if (!map) {
+    if (!offload_data_maps(netdev, &map, &mark_map)) {
         return NULL;
     }
 
@@ -254,9 +477,13 @@  ufid_to_rte_flow_associate(const ovs_u128 *ufid, struct netdev *netdev,
     data->rte_flow = rte_flow;
     data->actions_offloaded = actions_offloaded;
     data->creation_tid = dpdk_offload_thread_id();
+    data->flow_mark = flow_mark;
     ovs_mutex_init(&data->lock);
+    ovsrcu_set(&data->pmd_mapping, pmd_mapping);
 
     cmap_insert(map, CONST_CAST(struct cmap_node *, &data->node), hash);
+    cmap_insert(mark_map, CONST_CAST(struct cmap_node *, &data->mark_node),
+                hash_int(flow_mark, 0));
 
     offload_data_unlock(netdev);
     return data;
@@ -265,6 +492,10 @@  ufid_to_rte_flow_associate(const ovs_u128 *ufid, struct netdev *netdev,
 static void
 rte_flow_data_unref(struct ufid_to_rte_flow_data *data)
 {
+    struct pmd_data *pmd_mapping = ovsrcu_get(struct pmd_data *,
+                                              &data->pmd_mapping);
+
+    netdev_offload_dpdk_pmd_data_cleanup_mapping(pmd_mapping);
     ovs_mutex_destroy(&data->lock);
     free(data);
 }
@@ -274,20 +505,28 @@  ufid_to_rte_flow_disassociate(struct ufid_to_rte_flow_data *data)
     OVS_REQUIRES(data->lock)
 {
     size_t hash = hash_bytes(&data->ufid, sizeof data->ufid, 0);
-    struct cmap *map = offload_data_map(data->netdev);
+    struct pmd_data *pmd_mapping;
+    struct cmap *map, *mark_map;
 
-    if (!map) {
+    if (!offload_data_maps(data->netdev, &map, &mark_map)) {
         return;
     }
 
     offload_data_lock(data->netdev);
     cmap_remove(map, CONST_CAST(struct cmap_node *, &data->node), hash);
+    cmap_remove(mark_map, CONST_CAST(struct cmap_node *, &data->mark_node),
+                hash_int(data->flow_mark, 0));
     offload_data_unlock(data->netdev);
 
     if (data->netdev != data->physdev) {
         netdev_close(data->netdev);
     }
     netdev_close(data->physdev);
+
+    /* There should be no more users before removing the hw flow. */
+    pmd_mapping = ovsrcu_get(struct pmd_data *, &data->pmd_mapping);
+    ovs_assert(!pmd_mapping || !cmap_count(&pmd_mapping->pmd_id_to_flow_ref));
+
     ovsrcu_postpone(rte_flow_data_unref, data);
 }
 
@@ -1153,7 +1392,8 @@  vport_to_rte_tunnel(struct netdev *vport,
 }
 
 static int
-add_vport_match(struct flow_patterns *patterns,
+add_vport_match(struct dpif_offload_dpdk *offload,
+                struct flow_patterns *patterns,
                 odp_port_t orig_in_port,
                 struct netdev *tnldev)
 {
@@ -1164,7 +1404,7 @@  add_vport_match(struct flow_patterns *patterns,
     struct netdev *physdev;
     int ret;
 
-    physdev = dpif_netdev_offload_get_netdev_by_port_id(orig_in_port);
+    physdev = dpif_offload_dpdk_get_netdev(offload, orig_in_port);
     if (physdev == NULL) {
         return -1;
     }
@@ -1374,14 +1614,15 @@  parse_gre_match(struct flow_patterns *patterns,
 }
 
 static int OVS_UNUSED
-parse_flow_tnl_match(struct netdev *tnldev,
+parse_flow_tnl_match(struct dpif_offload_dpdk *offload,
+                     struct netdev *tnldev,
                      struct flow_patterns *patterns,
                      odp_port_t orig_in_port,
                      struct match *match)
 {
     int ret;
 
-    ret = add_vport_match(patterns, orig_in_port, tnldev);
+    ret = add_vport_match(offload, patterns, orig_in_port, tnldev);
     if (ret) {
         return ret;
     }
@@ -1397,7 +1638,8 @@  parse_flow_tnl_match(struct netdev *tnldev,
 }
 
 static int
-parse_flow_match(struct netdev *netdev,
+parse_flow_match(struct dpif_offload_dpdk *offload OVS_UNUSED,
+                 struct netdev *netdev,
                  odp_port_t orig_in_port OVS_UNUSED,
                  struct flow_patterns *patterns,
                  struct match *match)
@@ -1415,7 +1657,7 @@  parse_flow_match(struct netdev *netdev,
     patterns->physdev = netdev;
 #ifdef ALLOW_EXPERIMENTAL_API /* Packet restoration API required. */
     if (netdev_vport_is_vport_class(netdev->netdev_class) &&
-        parse_flow_tnl_match(netdev, patterns, orig_in_port, match)) {
+        parse_flow_tnl_match(offload, netdev, patterns, orig_in_port, match)) {
         return -1;
     }
 #endif
@@ -1837,7 +2079,8 @@  add_represented_port_action(struct flow_actions *actions,
 }
 
 static int
-add_output_action(struct netdev *netdev,
+add_output_action(struct dpif_offload_dpdk *offload,
+                  struct netdev *netdev,
                   struct flow_actions *actions,
                   const struct nlattr *nla)
 {
@@ -1846,7 +2089,7 @@  add_output_action(struct netdev *netdev,
     int ret = 0;
 
     port = nl_attr_get_odp_port(nla);
-    outdev = dpif_netdev_offload_get_netdev_by_port_id(port);
+    outdev = dpif_offload_dpdk_get_netdev(offload, port);
     if (outdev == NULL) {
         VLOG_DBG_RL(&rl, "Cannot find netdev for odp port %"PRIu32, port);
         return -1;
@@ -2128,7 +2371,8 @@  add_tunnel_push_action(struct flow_actions *actions,
 }
 
 static int
-parse_clone_actions(struct netdev *netdev,
+parse_clone_actions(struct dpif_offload_dpdk *offload,
+                    struct netdev *netdev,
                     struct flow_actions *actions,
                     const struct nlattr *clone_actions,
                     const size_t clone_actions_len)
@@ -2143,7 +2387,7 @@  parse_clone_actions(struct netdev *netdev,
             const struct ovs_action_push_tnl *tnl_push = nl_attr_get(ca);
             add_tunnel_push_action(actions, tnl_push);
         } else if (clone_type == OVS_ACTION_ATTR_OUTPUT) {
-            if (add_output_action(netdev, actions, ca)) {
+            if (add_output_action(offload, netdev, actions, ca)) {
                 return -1;
             }
         } else if (clone_type == OVS_ACTION_ATTR_PUSH_VLAN) {
@@ -2169,7 +2413,8 @@  add_jump_action(struct flow_actions *actions, uint32_t group)
 }
 
 static int OVS_UNUSED
-add_tnl_pop_action(struct netdev *netdev,
+add_tnl_pop_action(struct dpif_offload_dpdk *offload,
+                   struct netdev *netdev,
                    struct flow_actions *actions,
                    const struct nlattr *nla)
 {
@@ -2182,7 +2427,7 @@  add_tnl_pop_action(struct netdev *netdev,
     int ret;
 
     port = nl_attr_get_odp_port(nla);
-    vport = dpif_netdev_offload_get_netdev_by_port_id(port);
+    vport = dpif_offload_dpdk_get_netdev(offload, port);
     if (vport == NULL) {
         return -1;
     }
@@ -2212,7 +2457,8 @@  add_tnl_pop_action(struct netdev *netdev,
 }
 
 static int
-parse_flow_actions(struct netdev *netdev,
+parse_flow_actions(struct dpif_offload_dpdk *offload,
+                   struct netdev *netdev,
                    struct flow_actions *actions,
                    struct nlattr *nl_actions,
                    size_t nl_actions_len)
@@ -2223,7 +2469,7 @@  parse_flow_actions(struct netdev *netdev,
     add_count_action(actions);
     NL_ATTR_FOR_EACH_UNSAFE (nla, left, nl_actions, nl_actions_len) {
         if (nl_attr_type(nla) == OVS_ACTION_ATTR_OUTPUT) {
-            if (add_output_action(netdev, actions, nla)) {
+            if (add_output_action(offload, netdev, actions, nla)) {
                 return -1;
             }
         } else if (nl_attr_type(nla) == OVS_ACTION_ATTR_DROP) {
@@ -2253,13 +2499,13 @@  parse_flow_actions(struct netdev *netdev,
             const struct nlattr *clone_actions = nl_attr_get(nla);
             size_t clone_actions_len = nl_attr_get_size(nla);
 
-            if (parse_clone_actions(netdev, actions, clone_actions,
+            if (parse_clone_actions(offload, netdev, actions, clone_actions,
                                     clone_actions_len)) {
                 return -1;
             }
 #ifdef ALLOW_EXPERIMENTAL_API /* Packet restoration API required. */
         } else if (nl_attr_type(nla) == OVS_ACTION_ATTR_TUNNEL_POP) {
-            if (add_tnl_pop_action(netdev, actions, nla)) {
+            if (add_tnl_pop_action(offload, netdev, actions, nla)) {
                 return -1;
             }
 #endif
@@ -2279,7 +2525,8 @@  parse_flow_actions(struct netdev *netdev,
 }
 
 static struct rte_flow *
-netdev_offload_dpdk_actions(struct netdev *netdev,
+netdev_offload_dpdk_actions(struct dpif_offload_dpdk *offload,
+                            struct netdev *netdev,
                             struct flow_patterns *patterns,
                             struct nlattr *nl_actions,
                             size_t actions_len)
@@ -2294,7 +2541,8 @@  netdev_offload_dpdk_actions(struct netdev *netdev,
     struct rte_flow_error error;
     int ret;
 
-    ret = parse_flow_actions(netdev, &actions, nl_actions, actions_len);
+    ret = parse_flow_actions(offload, netdev, &actions, nl_actions,
+                             actions_len);
     if (ret) {
         goto out;
     }
@@ -2306,12 +2554,15 @@  out:
 }
 
 static struct ufid_to_rte_flow_data *
-netdev_offload_dpdk_add_flow(struct netdev *netdev,
+netdev_offload_dpdk_add_flow(struct dpif_offload_dpdk *offload,
+                             struct pmd_data *pmd_mapping,
+                             struct netdev *netdev,
                              struct match *match,
                              struct nlattr *nl_actions,
                              size_t actions_len,
                              const ovs_u128 *ufid,
-                             struct dpif_netdev_offload_info *info)
+                             uint32_t flow_mark,
+                             odp_port_t orig_in_port)
 {
     struct flow_patterns patterns = {
         .items = NULL,
@@ -2322,20 +2573,20 @@  netdev_offload_dpdk_add_flow(struct netdev *netdev,
     bool actions_offloaded = true;
     struct rte_flow *flow;
 
-    if (parse_flow_match(netdev, info->orig_in_port, &patterns, match)) {
+    if (parse_flow_match(offload, netdev, orig_in_port, &patterns, match)) {
         VLOG_DBG_RL(&rl, "%s: matches of ufid "UUID_FMT" are not supported",
                     netdev_get_name(netdev), UUID_ARGS((struct uuid *) ufid));
         goto out;
     }
 
-    flow = netdev_offload_dpdk_actions(patterns.physdev, &patterns, nl_actions,
-                                       actions_len);
+    flow = netdev_offload_dpdk_actions(offload, patterns.physdev, &patterns,
+                                       nl_actions, actions_len);
     if (!flow && !netdev_vport_is_vport_class(netdev->netdev_class)) {
         /* If we failed to offload the rule actions fallback to MARK+RSS
          * actions.
          */
         flow = netdev_offload_dpdk_mark_rss(&patterns, netdev,
-                                            info->flow_mark);
+                                            flow_mark);
         actions_offloaded = false;
     }
 
@@ -2343,7 +2594,8 @@  netdev_offload_dpdk_add_flow(struct netdev *netdev,
         goto out;
     }
     flows_data = ufid_to_rte_flow_associate(ufid, netdev, patterns.physdev,
-                                            flow, actions_offloaded);
+                                            flow, actions_offloaded,
+                                            flow_mark, pmd_mapping);
     VLOG_DBG("%s/%s: installed flow %p by ufid "UUID_FMT,
              netdev_get_name(netdev), netdev_get_name(patterns.physdev), flow,
              UUID_ARGS((struct uuid *) ufid));
@@ -2354,8 +2606,11 @@  out:
 }
 
 static int
-netdev_offload_dpdk_flow_destroy(struct ufid_to_rte_flow_data *rte_flow_data)
+netdev_offload_dpdk_flow_destroy(struct dpif_offload_dpdk *offload,
+                                 struct ufid_to_rte_flow_data *rte_flow_data,
+                                 bool force_destroy, bool keep_flow_mark)
 {
+    struct pmd_data *pmd_mapping;
     struct rte_flow_error error;
     struct rte_flow *rte_flow;
     struct netdev *physdev;
@@ -2365,11 +2620,21 @@  netdev_offload_dpdk_flow_destroy(struct ufid_to_rte_flow_data *rte_flow_data)
 
     ovs_mutex_lock(&rte_flow_data->lock);
 
-    if (rte_flow_data->dead) {
+    pmd_mapping = ovsrcu_get(struct pmd_data *, &rte_flow_data->pmd_mapping);
+
+    /* Only delete the flow from HW if no PMDs are using it, and it's not a
+     * forceful destroy. */
+    if (rte_flow_data->dead
+        || (!force_destroy && pmd_mapping
+            && cmap_count(&pmd_mapping->pmd_id_to_flow_ref))) {
         ovs_mutex_unlock(&rte_flow_data->lock);
         return 0;
     }
 
+    if (force_destroy) {
+        netdev_offload_dpdk_pmd_data_cleanup_mappings(offload, pmd_mapping);
+    }
+
     rte_flow_data->dead = true;
 
     rte_flow = rte_flow_data->rte_flow;
@@ -2388,12 +2653,22 @@  netdev_offload_dpdk_flow_destroy(struct ufid_to_rte_flow_data *rte_flow_data)
         data->rte_flow_counters[tid]--;
 
         VLOG_DBG_RL(&rl, "%s/%s: rte_flow 0x%"PRIxPTR
-                    " flow destroy %d ufid " UUID_FMT,
+                    " flow %sdestroy %d ufid " UUID_FMT,
                     netdev_get_name(netdev), netdev_get_name(physdev),
                     (intptr_t) rte_flow,
+                    force_destroy ? "force " : "",
                     netdev_dpdk_get_port_id(physdev),
                     UUID_ARGS((struct uuid *) ufid));
+
         ufid_to_rte_flow_disassociate(rte_flow_data);
+
+        if (!keep_flow_mark) {
+            /* Do this after ufid_to_rte_flow_disassociate(), as it needs a
+             * valid flow mark to do its work. */
+            dpif_offload_dpdk_free_flow_mark(offload,
+                                             rte_flow_data->flow_mark);
+            rte_flow_data->flow_mark = INVALID_FLOW_MARK;
+        }
     } else {
         VLOG_ERR("Failed flow: %s/%s: flow destroy %d ufid " UUID_FMT,
                  netdev_get_name(netdev), netdev_get_name(physdev),
@@ -2426,24 +2701,31 @@  get_netdev_odp_cb(struct netdev *netdev,
 }
 
 int
-netdev_offload_dpdk_flow_put(struct netdev *netdev, struct match *match,
+netdev_offload_dpdk_flow_put(struct dpif_offload_dpdk *offload,
+                             unsigned pmd_id, void *flow_reference,
+                             struct netdev *netdev, struct match *match,
                              struct nlattr *actions, size_t actions_len,
-                             const ovs_u128 *ufid,
-                             struct dpif_netdev_offload_info *info,
+                             const ovs_u128 *ufid, odp_port_t orig_in_port,
+                             void **previous_flow_reference,
                              struct dpif_flow_stats *stats)
 {
     struct ufid_to_rte_flow_data *rte_flow_data;
     struct dpif_flow_stats old_stats;
+    struct pmd_data *pmd_mapping;
     bool modification = false;
+    uint32_t flow_mark;
+    bool pmd_exists;
     int ret;
 
-    /*
-     * If an old rte_flow exists, it means it's a flow modification.
-     * Here destroy the old rte flow first before adding a new one.
-     * Keep the stats for the newly created rule.
+    /* If an old rte_flow exists for this pmd_id, it means it's a flow
+     * modification.  Here destroy the old rte flow first before adding a
+     * new one.  Keep the stats and pmd_mapping for the newly created rule.
      */
-    rte_flow_data = ufid_to_rte_flow_data_find(netdev, ufid, false);
-    if (rte_flow_data && rte_flow_data->rte_flow) {
+    rte_flow_data = ufid_to_rte_flow_data_find_pmd_and_update(
+        netdev, ufid, pmd_id, &pmd_exists, flow_reference,
+        previous_flow_reference);
+
+    if (rte_flow_data && rte_flow_data->rte_flow && pmd_exists) {
         struct get_netdev_odp_aux aux = {
             .netdev = rte_flow_data->physdev,
             .odp_port = ODPP_NONE,
@@ -2452,21 +2734,54 @@  netdev_offload_dpdk_flow_put(struct netdev *netdev, struct match *match,
         /* Extract the orig_in_port from physdev as in case of modify the one
          * provided by upper layer cannot be used.
          */
-        dpif_netdev_offload_ports_traverse(get_netdev_odp_cb, &aux);
-        info->orig_in_port = aux.odp_port;
+        dpif_offload_dpdk_traverse_ports(offload, get_netdev_odp_cb, &aux);
+        orig_in_port = aux.odp_port;
         old_stats = rte_flow_data->stats;
         modification = true;
-        ret = netdev_offload_dpdk_flow_destroy(rte_flow_data);
+        pmd_mapping = ovsrcu_get(struct pmd_data *,
+                                 &rte_flow_data->pmd_mapping);
+        ovsrcu_set(&rte_flow_data->pmd_mapping, NULL);
+        flow_mark = rte_flow_data->flow_mark;
+
+        ret = netdev_offload_dpdk_flow_destroy(offload, rte_flow_data,
+                                               false, true);
         if (ret < 0) {
             return ret;
         }
+    } else if (!rte_flow_data) {
+        pmd_mapping = netdev_offload_dpdk_pmd_data_init();
+        netdev_offload_dpdk_pmd_data_associate(pmd_mapping, pmd_id,
+                                               flow_reference);
+        *previous_flow_reference = NULL;
+        flow_mark = dpif_offload_dpdk_allocate_flow_mark(offload);
+    } else /* if (rte_flow_data) */ {
+        pmd_mapping = ovsrcu_get(struct pmd_data *,
+                                 &rte_flow_data->pmd_mapping);
+
+        netdev_offload_dpdk_pmd_data_associate(pmd_mapping, pmd_id,
+                                               flow_reference);
+        *previous_flow_reference = NULL;
+    }
+
+    if (modification || !rte_flow_data) {
+        rte_flow_data = netdev_offload_dpdk_add_flow(offload, pmd_mapping,
+                                                     netdev, match,
+                                                     actions, actions_len,
+                                                     ufid, flow_mark,
+                                                     orig_in_port);
+        if (!rte_flow_data) {
+            /* Clean up existing mappings, except for the current pmd_id one,
+             * as this is handled through the callback. */
+            netdev_offload_dpdk_pmd_data_disassociate(pmd_mapping, pmd_id);
+            netdev_offload_dpdk_pmd_data_cleanup_mappings(offload,
+                                                          pmd_mapping);
+            ovsrcu_postpone(netdev_offload_dpdk_pmd_data_cleanup_mapping,
+                            pmd_mapping);
+            dpif_offload_dpdk_free_flow_mark(offload, flow_mark);
+            return -1;
+        }
     }
 
-    rte_flow_data = netdev_offload_dpdk_add_flow(netdev, match, actions,
-                                                 actions_len, ufid, info);
-    if (!rte_flow_data) {
-        return -1;
-    }
     if (modification) {
         rte_flow_data->stats = old_stats;
     }
@@ -2477,25 +2792,35 @@  netdev_offload_dpdk_flow_put(struct netdev *netdev, struct match *match,
 }
 
 int
-netdev_offload_dpdk_flow_del(struct netdev *netdev OVS_UNUSED,
+netdev_offload_dpdk_flow_del(struct dpif_offload_dpdk *offload,
+                             struct netdev *netdev, unsigned pmd_id,
                              const ovs_u128 *ufid,
+                             void *flow_reference,
                              struct dpif_flow_stats *stats)
 {
     struct ufid_to_rte_flow_data *rte_flow_data;
 
-    rte_flow_data = ufid_to_rte_flow_data_find(netdev, ufid, true);
+    rte_flow_data = ufid_to_rte_flow_data_find(netdev, ufid);
     if (!rte_flow_data || !rte_flow_data->rte_flow) {
         return -1;
     }
 
+    if (!netdev_offload_dpdk_pmd_data_find_pmd_and_delete(rte_flow_data,
+                                                          pmd_id,
+                                                          flow_reference)) {
+        return ENOENT;
+    }
+
     if (stats) {
         memset(stats, 0, sizeof *stats);
     }
-    return netdev_offload_dpdk_flow_destroy(rte_flow_data);
+    return netdev_offload_dpdk_flow_destroy(offload, rte_flow_data,
+                                            false, false);
 }
 
 int
-netdev_offload_dpdk_init(struct netdev *netdev)
+netdev_offload_dpdk_init(struct netdev *netdev,
+                         unsigned int offload_thread_count)
 {
     int ret = EOPNOTSUPP;
 
@@ -2507,7 +2832,7 @@  netdev_offload_dpdk_init(struct netdev *netdev)
     }
 
     if (netdev_dpdk_flow_api_supported(netdev, false)) {
-        ret = offload_data_init(netdev);
+        ret = offload_data_init(netdev, offload_thread_count);
     }
 
     return ret;
@@ -2537,7 +2862,7 @@  netdev_offload_dpdk_flow_get(struct netdev *netdev,
 
     attrs->dp_extra_info = NULL;
 
-    rte_flow_data = ufid_to_rte_flow_data_find(netdev, ufid, false);
+    rte_flow_data = ufid_to_rte_flow_data_find(netdev, ufid);
     if (!rte_flow_data || !rte_flow_data->rte_flow ||
         rte_flow_data->dead || ovs_mutex_trylock(&rte_flow_data->lock)) {
         return -1;
@@ -2580,7 +2905,8 @@  out:
 }
 
 static void
-flush_netdev_flows_in_related(struct netdev *netdev, struct netdev *related)
+flush_netdev_flows_in_related(struct dpif_offload_dpdk *offload,
+                              struct netdev *netdev, struct netdev *related)
 {
     unsigned int tid = dpdk_offload_thread_id();
     struct cmap *map = offload_data_map(related);
@@ -2595,33 +2921,43 @@  flush_netdev_flows_in_related(struct netdev *netdev, struct netdev *related)
             continue;
         }
         if (data->creation_tid == tid) {
-            netdev_offload_dpdk_flow_destroy(data);
+            netdev_offload_dpdk_flow_destroy(offload, data, true, false);
         }
     }
 }
+struct flush_in_vport_aux {
+    struct dpif_offload_dpdk *offload;
+    struct netdev *netdev;
+};
 
 static bool
 flush_in_vport_cb(struct netdev *vport,
                   odp_port_t odp_port OVS_UNUSED,
-                  void *aux)
+                  void *aux_)
 {
-    struct netdev *netdev = aux;
+    struct flush_in_vport_aux *aux = aux_;
 
     /* Only vports are related to physical devices. */
     if (netdev_vport_is_vport_class(vport->netdev_class)) {
-        flush_netdev_flows_in_related(netdev, vport);
+        flush_netdev_flows_in_related(aux->offload, aux->netdev, vport);
     }
 
     return false;
 }
 
 int
-netdev_offload_dpdk_flow_flush(struct netdev *netdev)
+netdev_offload_dpdk_flow_flush(struct dpif_offload_dpdk *offload,
+                               struct netdev *netdev)
 {
-    flush_netdev_flows_in_related(netdev, netdev);
+    flush_netdev_flows_in_related(offload, netdev, netdev);
 
     if (!netdev_vport_is_vport_class(netdev->netdev_class)) {
-        dpif_netdev_offload_ports_traverse(flush_in_vport_cb, netdev);
+        struct flush_in_vport_aux aux = {
+            .offload = offload,
+            .netdev = netdev
+        };
+
+        dpif_offload_dpdk_traverse_ports(offload, flush_in_vport_cb, &aux);
     }
 
     return 0;
@@ -2669,7 +3005,8 @@  out:
 }
 
 static struct netdev *
-get_vport_netdev(struct rte_flow_tunnel *tunnel,
+get_vport_netdev(struct dpif_offload_dpdk *offload,
+                 struct rte_flow_tunnel *tunnel,
                  odp_port_t *odp_port)
 {
     struct get_vport_netdev_aux aux = {
@@ -2684,23 +3021,39 @@  get_vport_netdev(struct rte_flow_tunnel *tunnel,
     } else if (tunnel->type == RTE_FLOW_ITEM_TYPE_GRE) {
         aux.type = "gre";
     }
-    dpif_netdev_offload_ports_traverse(get_vport_netdev_cb, &aux);
+    dpif_offload_dpdk_traverse_ports(offload, get_vport_netdev_cb, &aux);
 
     return aux.vport;
 }
 
-int
-netdev_offload_dpdk_hw_miss_packet_recover(struct netdev *netdev,
-                                           struct dp_packet *packet)
+int netdev_offload_dpdk_hw_miss_packet_recover(
+    struct dpif_offload_dpdk *offload, struct netdev *netdev, unsigned pmd_id,
+    struct dp_packet *packet, void **flow_reference)
 {
+    struct pmd_id_to_flow_ref_data *pmd_data = NULL;
     struct rte_flow_restore_info rte_restore_info;
     struct rte_flow_tunnel *rte_tnl;
     struct netdev *vport_netdev;
     struct pkt_metadata *md;
     struct flow_tnl *md_tnl;
     odp_port_t vport_odp;
+    uint32_t flow_mark;
     int ret = 0;
 
+    if (dp_packet_has_flow_mark(packet, &flow_mark)) {
+        struct ufid_to_rte_flow_data *data;
+
+        data = mark_to_rte_flow_data_find(netdev, flow_mark);
+        if (data) {
+             pmd_data = netdev_offload_dpdk_pmd_data_get_data(data, pmd_id);
+        }
+    }
+    if (pmd_data) {
+        *flow_reference = pmd_data->flow_reference;
+    } else {
+        *flow_reference = NULL;
+    }
+
     ret = netdev_dpdk_rte_flow_get_restore_info(netdev, packet,
                                                 &rte_restore_info, NULL);
     if (ret) {
@@ -2718,7 +3071,7 @@  netdev_offload_dpdk_hw_miss_packet_recover(struct netdev *netdev,
     }
 
     rte_tnl = &rte_restore_info.tunnel;
-    vport_netdev = get_vport_netdev(rte_tnl, &vport_odp);
+    vport_netdev = get_vport_netdev(offload, rte_tnl, &vport_odp);
     if (!vport_netdev) {
         VLOG_WARN_RL(&rl, "Could not find vport netdev");
         return EOPNOTSUPP;
@@ -2780,7 +3133,8 @@  close_vport_netdev:
 }
 
 uint64_t
-netdev_offload_dpdk_flow_get_n_offloaded(struct netdev *netdev)
+netdev_offload_dpdk_flow_get_n_offloaded(struct netdev *netdev,
+                                         unsigned int offload_thread_count)
 {
     struct netdev_offload_dpdk_data *data;
     uint64_t total = 0;
@@ -2792,9 +3146,24 @@  netdev_offload_dpdk_flow_get_n_offloaded(struct netdev *netdev)
         return 0;
     }
 
-    for (tid = 0; tid < dpdk_offload_thread_nb(); tid++) {
+    for (tid = 0; tid < offload_thread_count; tid++) {
         total += data->rte_flow_counters[tid];
     }
 
     return total;
 }
+
+uint64_t
+netdev_offload_dpdk_flow_get_n_offloaded_by_thread(struct netdev *netdev,
+                                                   unsigned int tid)
+{
+    struct netdev_offload_dpdk_data *data;
+
+    data = (struct netdev_offload_dpdk_data *)
+        ovsrcu_get(void *, &netdev->hw_info.offload_data);
+    if (!data) {
+        return 0;
+    }
+
+    return data->rte_flow_counters[tid];
+}
diff --git a/lib/netdev-offload-dpdk.h b/lib/netdev-offload-dpdk.h
index 3587a3a72..818806b82 100644
--- a/lib/netdev-offload-dpdk.h
+++ b/lib/netdev-offload-dpdk.h
@@ -18,61 +18,40 @@ 
  #define NETDEV_OFFLOAD_DPDK_H
 
 /* Forward declarations of private structures. */
+struct dpif_offload_dpdk;
 struct netdev;
-struct dpif_netdev_offload_info;
 
 /* Netdev-specific offload functions.  These should only be used by the
  * associated dpif offload provider. */
-int netdev_offload_dpdk_init(struct netdev *);
+int netdev_offload_dpdk_init(struct netdev *,
+                             unsigned int offload_thread_count);
 void netdev_offload_dpdk_uninit(struct netdev *);
-int netdev_offload_dpdk_flow_flush(struct netdev *);
-uint64_t netdev_offload_dpdk_flow_get_n_offloaded(struct netdev *);
-int netdev_offload_dpdk_hw_miss_packet_recover(struct netdev *,
-                                               struct dp_packet *);
-#ifdef DPDK_NETDEV
-int netdev_offload_dpdk_flow_put(struct netdev *, struct match *,
+int netdev_offload_dpdk_flow_flush(struct dpif_offload_dpdk *,
+                                   struct netdev *);
+uint64_t netdev_offload_dpdk_flow_get_n_offloaded(
+    struct netdev *, unsigned int offload_thread_count);
+uint64_t netdev_offload_dpdk_flow_get_n_offloaded_by_thread(
+    struct netdev *, unsigned int tid);
+int netdev_offload_dpdk_hw_miss_packet_recover(struct dpif_offload_dpdk *,
+                                               struct netdev *,
+                                               unsigned pmd_id,
+                                               struct dp_packet *,
+                                               void **flow_reference);
+int netdev_offload_dpdk_flow_put(struct dpif_offload_dpdk *,
+                                 unsigned pmd_id, void *flow_reference,
+                                 struct netdev *, struct match *,
                                  struct nlattr *actions, size_t actions_len,
                                  const ovs_u128 *ufid,
-                                 struct dpif_netdev_offload_info *,
+                                 odp_port_t orig_in_port,
+                                 void **previous_flow_reference,
                                  struct dpif_flow_stats *);
-int netdev_offload_dpdk_flow_del(struct netdev *, const ovs_u128 *ufid,
+int netdev_offload_dpdk_flow_del(struct dpif_offload_dpdk *, struct netdev *,
+                                 unsigned pmd_id, const ovs_u128 *ufid,
+                                 void *flow_reference,
                                  struct dpif_flow_stats *);
 int netdev_offload_dpdk_flow_get(struct netdev *, struct match *,
                                  struct nlattr **actions, const ovs_u128 *ufid,
                                  struct dpif_flow_stats *,
                                  struct dpif_flow_attrs *, struct ofpbuf *buf);
-#else
-static inline int
-netdev_offload_dpdk_flow_put(struct netdev *netdev OVS_UNUSED,
-                              struct match *match OVS_UNUSED,
-                              struct nlattr *actions OVS_UNUSED,
-                              size_t actions_len OVS_UNUSED,
-                              const ovs_u128 *ufid OVS_UNUSED,
-                              struct dpif_netdev_offload_info *info OVS_UNUSED,
-                              struct dpif_flow_stats *stats OVS_UNUSED)
-{
-    return EOPNOTSUPP;
-}
-
-static inline int
-netdev_offload_dpdk_flow_del(struct netdev *netdev OVS_UNUSED,
-                             const ovs_u128 *ufid OVS_UNUSED,
-                             struct dpif_flow_stats *stats OVS_UNUSED)
-{
-    return EOPNOTSUPP;
-}
-
-static inline int
-netdev_offload_dpdk_flow_get(struct netdev *netdev OVS_UNUSED,
-                             struct match *match OVS_UNUSED,
-                             struct nlattr **actions OVS_UNUSED,
-                             const ovs_u128 *ufid OVS_UNUSED,
-                             struct dpif_flow_stats *stats OVS_UNUSED,
-                             struct dpif_flow_attrs *attrs OVS_UNUSED,
-                             struct ofpbuf *buf OVS_UNUSED)
-{
-    return EOPNOTSUPP;
-}
-#endif /* #ifdef DPDK_NETDEV */
 
 #endif /* NETDEV_OFFLOAD_DPDK_H */