[ovs-dev,v8,5/6] dpif-netdev: do hw flow offload in a thread

Message ID 8ab46eb0b4bf178fccc7dae008c10beb290d291e.1522136948.git.shahafs@mellanox.com
State Superseded
Headers show
Series
  • OVS-DPDK flow offload with rte_flow
Related show

Commit Message

Shahaf Shuler March 27, 2018, 7:54 a.m.
From: Yuanhan Liu <yliu@fridaylinux.org>

Currently, the major trigger for hw flow offload is at upcall handling,
which is actually in the datapath. Moreover, the hw offload installation
and modification is not that lightweight. Meaning, if there are so many
flows being added or modified frequently, it could stall the datapath,
which could result to packet loss.

To diminish that, all those flow operations will be recorded and appended
to a list. A thread is then introduced to process this list (to do the
real flow offloading put/del operations). This could leave the datapath
as lightweight as possible.

Signed-off-by: Yuanhan Liu <yliu@fridaylinux.org>
Signed-off-by: Shahaf Shuler <shahafs@mellanox.com>
---
 lib/dpif-netdev.c | 348 ++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 258 insertions(+), 90 deletions(-)

Comments

Stokes, Ian April 10, 2018, 7:58 p.m. | #1
> Currently, the major trigger for hw flow offload is at upcall handling,
> which is actually in the datapath. Moreover, the hw offload installation
> and modification is not that lightweight. Meaning, if there are so many
> flows being added or modified frequently, it could stall the datapath,
> which could result to packet loss.
> 
> To diminish that, all those flow operations will be recorded and appended
> to a list. A thread is then introduced to process this list (to do the
> real flow offloading put/del operations). This could leave the datapath as
> lightweight as possible.

Just a general query and not related to this patch specifically, but have you given any thought to statistics for the HWOL usecase? Should they be tracked in any way for OVS or if tracked by the NIC can they be accessed by OVS?

More comments inline below.
> 
> Signed-off-by: Yuanhan Liu <yliu@fridaylinux.org>
> Signed-off-by: Shahaf Shuler <shahafs@mellanox.com>
> ---
>  lib/dpif-netdev.c | 348 ++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 258 insertions(+), 90 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 7489a2f..8300286
> 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -345,6 +345,12 @@ enum rxq_cycles_counter_type {
>      RXQ_N_CYCLES
>  };
> 
> +enum {
> +    DP_NETDEV_FLOW_OFFLOAD_OP_ADD,
> +    DP_NETDEV_FLOW_OFFLOAD_OP_MOD,
> +    DP_NETDEV_FLOW_OFFLOAD_OP_DEL,
> +};
> +
>  #define XPS_TIMEOUT 500000LL    /* In microseconds. */
> 
>  /* Contained by struct dp_netdev_port's 'rxqs' member.  */ @@ -721,6
> +727,8 @@ static inline bool emc_entry_alive(struct emc_entry *ce);
> static void emc_clear_entry(struct emc_entry *ce);
> 
>  static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
> +static void queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
> +                                  struct dp_netdev_flow *flow);
> 
>  static void
>  emc_cache_init(struct emc_cache *flow_cache) @@ -1854,13 +1862,11 @@
> struct flow_mark {
>      struct cmap megaflow_to_mark;
>      struct cmap mark_to_flow;
>      struct id_pool *pool;
> -    struct ovs_mutex mutex;
>  };
> 
>  static struct flow_mark flow_mark = {
>      .megaflow_to_mark = CMAP_INITIALIZER,
>      .mark_to_flow = CMAP_INITIALIZER,
> -    .mutex = OVS_MUTEX_INITIALIZER,
>  };
> 
>  static uint32_t
> @@ -2010,7 +2016,7 @@ flow_mark_flush(struct dp_netdev_pmd_thread *pmd)
> 
>      CMAP_FOR_EACH (flow, mark_node, &flow_mark.mark_to_flow) {
>          if (flow->pmd_id == pmd->core_id) {
> -            mark_to_flow_disassociate(pmd, flow);
> +            queue_netdev_flow_del(pmd, flow);
>          }
>      }
>  }
> @@ -2032,6 +2038,251 @@ mark_to_flow_find(const struct
> dp_netdev_pmd_thread *pmd,
>      return NULL;
>  }
> 
> +struct dp_flow_offload_item {
> +    struct dp_netdev_pmd_thread *pmd;
> +    struct dp_netdev_flow *flow;
> +    int op;
> +    struct match match;
> +    struct nlattr *actions;
> +    size_t actions_len;
> +
> +    struct ovs_list node;
> +};
> +
> +struct dp_flow_offload {
> +    struct ovs_mutex mutex;
> +    struct ovs_list list;
> +    pthread_cond_t cond;
> +};
> +
> +static struct dp_flow_offload dp_flow_offload = {
> +    .mutex = OVS_MUTEX_INITIALIZER,
> +    .list  = OVS_LIST_INITIALIZER(&dp_flow_offload.list),
> +};
> +
> +static struct ovsthread_once offload_thread_once
> +    = OVSTHREAD_ONCE_INITIALIZER;

The structs above are declared mid file after the pre-existing mark_to_flow_find function.

It would look cleaner if declared toward the beginning with the enums etc, so as to keep functions and structs separate.
> +
> +static struct dp_flow_offload_item *
> +dp_netdev_alloc_flow_offload(struct dp_netdev_pmd_thread *pmd,
> +                             struct dp_netdev_flow *flow,
> +                             int op)
> +{
> +    struct dp_flow_offload_item *offload;
> +
> +    offload = xzalloc(sizeof(*offload));
> +    offload->pmd = pmd;
> +    offload->flow = flow;
> +    offload->op = op;
> +
> +    dp_netdev_flow_ref(flow);
> +    dp_netdev_pmd_try_ref(pmd);
> +
> +    return offload;
> +}
> +
> +static void
> +dp_netdev_free_flow_offload(struct dp_flow_offload_item *offload) {
> +    dp_netdev_pmd_unref(offload->pmd);
> +    dp_netdev_flow_unref(offload->flow);
> +
> +    free(offload->actions);
> +    free(offload);
> +}
> +
> +static void
> +dp_netdev_append_flow_offload(struct dp_flow_offload_item *offload) {
> +    ovs_mutex_lock(&dp_flow_offload.mutex);
> +    ovs_list_push_back(&dp_flow_offload.list, &offload->node);
> +    xpthread_cond_signal(&dp_flow_offload.cond);
> +    ovs_mutex_unlock(&dp_flow_offload.mutex);
> +}
> +
> +static int
> +dp_netdev_flow_offload_del(struct dp_flow_offload_item *offload) {
> +    return mark_to_flow_disassociate(offload->pmd, offload->flow); }
> +
> +/*
> + * There are two flow offload operations here: addition and modification.
> + *
> + * For flow addition, this function does:
> + * - allocate a new flow mark id
> + * - perform hardware flow offload
> + * - associate the flow mark with flow and mega flow
> + *
> + * For flow modification, both flow mark and the associations are still
> + * valid, thus only item 2 needed.
> + */
> +static int
> +dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload) {
> +    struct dp_netdev_port *port;
> +    struct dp_netdev_pmd_thread *pmd = offload->pmd;
> +    struct dp_netdev_flow *flow = offload->flow;
> +    odp_port_t in_port = flow->flow.in_port.odp_port;
> +    bool modification = offload->op == DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
> +    struct offload_info info;
> +    uint32_t mark;
> +    int ret;
> +
> +    if (flow->dead) {
> +        return -1;
> +    }
> +
> +    if (modification) {
> +        mark = flow->mark;
> +        ovs_assert(mark != INVALID_FLOW_MARK);
> +    } else {
> +        /*
> +         * If a mega flow has already been offloaded (from other PMD
> +         * instances), do not offload it again.
> +         */
> +        mark = megaflow_to_mark_find(&flow->mega_ufid);
> +        if (mark != INVALID_FLOW_MARK) {
> +            VLOG_DBG("Flow has already been offloaded with mark %u\n",
> mark);
> +            if (flow->mark != INVALID_FLOW_MARK) {
> +                ovs_assert(flow->mark == mark);
> +            } else {
> +                mark_to_flow_associate(mark, flow);
> +            }
> +            return 0;
> +        }
> +
> +        mark = flow_mark_alloc();
> +        if (mark == INVALID_FLOW_MARK) {
> +            VLOG_ERR("Failed to allocate flow mark!\n");
> +        }
> +    }
> +    info.flow_mark = mark;
> +
> +    ovs_mutex_lock(&pmd->dp->port_mutex);
> +    port = dp_netdev_lookup_port(pmd->dp, in_port);
> +    if (!port) {
> +        ovs_mutex_unlock(&pmd->dp->port_mutex);
> +        return -1;
> +    }
> +    ret = netdev_flow_put(port->netdev, &offload->match,
> +                          CONST_CAST(struct nlattr *, offload->actions),
> +                          offload->actions_len, &flow->mega_ufid, &info,
> +                          NULL);
> +    ovs_mutex_unlock(&pmd->dp->port_mutex);
> +
> +    if (ret) {
> +        if (!modification) {
> +            flow_mark_free(mark);
> +        } else {
> +            mark_to_flow_disassociate(pmd, flow);
> +        }
> +        return -1;
> +    }
> +
> +    if (!modification) {
> +        megaflow_to_mark_associate(&flow->mega_ufid, mark);
> +        mark_to_flow_associate(mark, flow);
> +    }
> +
> +    return 0;
> +}
> +
> +static void *
> +dp_netdev_flow_offload_main(void *data OVS_UNUSED) {
> +    struct dp_flow_offload_item *offload;
> +    struct ovs_list *list;
> +    const char *op;
> +    int ret;
> +
> +    for (;;) {
> +        ovs_mutex_lock(&dp_flow_offload.mutex);
> +        if (ovs_list_is_empty(&dp_flow_offload.list)) {
> +            ovsrcu_quiesce_start();
> +            ovs_mutex_cond_wait(&dp_flow_offload.cond,
> +                                &dp_flow_offload.mutex);
> +        }
> +        list = ovs_list_pop_front(&dp_flow_offload.list);
> +        offload = CONTAINER_OF(list, struct dp_flow_offload_item, node);
> +        ovs_mutex_unlock(&dp_flow_offload.mutex);
> +
> +        switch (offload->op) {
> +        case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
> +            op = "add";
> +            ret = dp_netdev_flow_offload_put(offload);
> +            break;
> +        case DP_NETDEV_FLOW_OFFLOAD_OP_MOD:
> +            op = "modify";
> +            ret = dp_netdev_flow_offload_put(offload);
> +            break;
> +        case DP_NETDEV_FLOW_OFFLOAD_OP_DEL:
> +            op = "delete";
> +            ret = dp_netdev_flow_offload_del(offload);
> +            break;
> +        default:
> +            OVS_NOT_REACHED();
> +        }
> +
> +        VLOG_DBG("%s to %s netdev flow\n",
> +                 ret == 0 ? "succeed" : "failed", op);
> +        dp_netdev_free_flow_offload(offload);
> +    }
> +
> +    return NULL;
> +}
> +
> +static void
> +queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
> +                      struct dp_netdev_flow *flow) {
> +    struct dp_flow_offload_item *offload;
> +
> +    if (ovsthread_once_start(&offload_thread_once)) {
> +        xpthread_cond_init(&dp_flow_offload.cond, NULL);
> +        ovs_thread_create("dp_netdev_flow_offload",
> +                          dp_netdev_flow_offload_main, NULL);
> +        ovsthread_once_done(&offload_thread_once);
> +    }
> +
> +    offload = dp_netdev_alloc_flow_offload(pmd, flow,
> +
> DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
> +    dp_netdev_append_flow_offload(offload);
> +}
> +
> +static void
> +queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
> +                      struct dp_netdev_flow *flow, struct match *match,
> +                      const struct nlattr *actions, size_t actions_len)
> +{
> +    struct dp_flow_offload_item *offload;
> +    int op;
> +
> +    if (!netdev_is_flow_api_enabled()) {
> +        return;
> +    }
> +
> +    if (ovsthread_once_start(&offload_thread_once)) {
> +        xpthread_cond_init(&dp_flow_offload.cond, NULL);
> +        ovs_thread_create("dp_netdev_flow_offload",
> +                          dp_netdev_flow_offload_main, NULL);
> +        ovsthread_once_done(&offload_thread_once);
> +    }
> +
> +    if (flow->mark != INVALID_FLOW_MARK) {
> +        op = DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
> +    } else {
> +        op = DP_NETDEV_FLOW_OFFLOAD_OP_ADD;
> +    }
> +    offload = dp_netdev_alloc_flow_offload(pmd, flow, op);
> +    offload->match = *match;
> +    offload->actions = xmalloc(actions_len);
> +    memcpy(offload->actions, actions, actions_len);
> +    offload->actions_len = actions_len;
> +
> +    dp_netdev_append_flow_offload(offload);
> +}
> +
>  static void
>  dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
>                            struct dp_netdev_flow *flow) @@ -2046,7 +2297,7
> @@ dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
>      dpcls_remove(cls, &flow->cr);
>      cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow-
> >ufid));
>      if (flow->mark != INVALID_FLOW_MARK) {
> -        mark_to_flow_disassociate(pmd, flow);
> +        queue_netdev_flow_del(pmd, flow);
>      }
>      flow->dead = true;
> 
> @@ -2627,88 +2878,6 @@ out:
>      return error;
>  }
> 
> -/*
> - * There are two flow offload operations here: addition and modification.
> - *
> - * For flow addition, this function does:
> - * - allocate a new flow mark id
> - * - perform hardware flow offload
> - * - associate the flow mark with flow and mega flow
> - *
> - * For flow modification, both flow mark and the associations are still
> - * valid, thus only item 2 needed.
> - */
> -static void
> -try_netdev_flow_put(struct dp_netdev_pmd_thread *pmd, odp_port_t in_port,
> -                    struct dp_netdev_flow *flow, struct match *match,
> -                    const struct nlattr *actions, size_t actions_len)
> -{
> -    struct offload_info info;
> -    struct dp_netdev_port *port;
> -    bool modification = flow->mark != INVALID_FLOW_MARK;
> -    const char *op = modification ? "modify" : "add";
> -    uint32_t mark;
> -    int ret;
> -
> -    ovs_mutex_lock(&flow_mark.mutex);
> -
> -    if (modification) {
> -        mark = flow->mark;
> -    } else {
> -        if (!netdev_is_flow_api_enabled()) {
> -            goto out;
> -        }
> -
> -        /*
> -         * If a mega flow has already been offloaded (from other PMD
> -         * instances), do not offload it again.
> -         */
> -        mark = megaflow_to_mark_find(&flow->mega_ufid);
> -        if (mark != INVALID_FLOW_MARK) {
> -            VLOG_DBG("Flow has already been offloaded with mark %u\n",
> mark);
> -            mark_to_flow_associate(mark, flow);
> -            goto out;
> -        }
> -
> -        mark = flow_mark_alloc();
> -        if (mark == INVALID_FLOW_MARK) {
> -            VLOG_ERR("Failed to allocate flow mark!\n");
> -            goto out;
> -        }
> -    }
> -    info.flow_mark = mark;
> -
> -    ovs_mutex_lock(&pmd->dp->port_mutex);
> -    port = dp_netdev_lookup_port(pmd->dp, in_port);
> -    if (!port) {
> -        ovs_mutex_unlock(&pmd->dp->port_mutex);
> -        goto out;
> -    }
> -    ret = netdev_flow_put(port->netdev, match,
> -                          CONST_CAST(struct nlattr *, actions),
> -                          actions_len, &flow->mega_ufid, &info, NULL);
> -    ovs_mutex_unlock(&pmd->dp->port_mutex);
> -
> -    if (ret) {
> -        VLOG_ERR("Failed to %s netdev flow with mark %u\n", op, mark);
> -        if (!modification) {
> -            flow_mark_free(mark);
> -        } else {
> -            mark_to_flow_disassociate(pmd, flow);
> -        }
> -        goto out;
> -    }
> -
> -    if (!modification) {
> -        megaflow_to_mark_associate(&flow->mega_ufid, mark);
> -        mark_to_flow_associate(mark, flow);
> -    }
> -    VLOG_DBG("Succeed to %s netdev flow with mark %u\n", op, mark);
> -
> -out:
> -    ovs_mutex_unlock(&flow_mark.mutex);
> -}
> -
>  static void
>  dp_netdev_get_mega_ufid(const struct match *match, ovs_u128 *mega_ufid)
> { @@ -2774,7 +2943,7 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread
> *pmd,
>      cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow-
> >node),
>                  dp_netdev_flow_hash(&flow->ufid));
> 
> -    try_netdev_flow_put(pmd, in_port, flow, match, actions, actions_len);
> +    queue_netdev_flow_put(pmd, flow, match, actions, actions_len);
> 
>      if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
>          struct ds ds = DS_EMPTY_INITIALIZER; @@ -2856,7 +3025,6 @@
> flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
>          if (put->flags & DPIF_FP_MODIFY) {
>              struct dp_netdev_actions *new_actions;
>              struct dp_netdev_actions *old_actions;
> -            odp_port_t in_port = netdev_flow->flow.in_port.odp_port;
> 
>              new_actions = dp_netdev_actions_create(put->actions,
>                                                     put->actions_len); @@
> -2864,8 +3032,8 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
>              old_actions = dp_netdev_flow_get_actions(netdev_flow);
>              ovsrcu_set(&netdev_flow->actions, new_actions);
> 
> -            try_netdev_flow_put(pmd, in_port, netdev_flow, match,
> -                                put->actions, put->actions_len);
> +            queue_netdev_flow_put(pmd, netdev_flow, match,
> +                                  put->actions, put->actions_len);
> 
>              if (stats) {
>                  get_dpif_flow_stats(netdev_flow, stats);
> --
> 2.7.4
Shahaf Shuler April 11, 2018, 12:35 p.m. | #2
Tuesday, April 10, 2018 10:58 PM, Stokes, Ian:
> Subject: RE: [PATCH v8 5/6] dpif-netdev: do hw flow offload in a thread
> 
> > Currently, the major trigger for hw flow offload is at upcall
> > handling, which is actually in the datapath. Moreover, the hw offload
> > installation and modification is not that lightweight. Meaning, if
> > there are so many flows being added or modified frequently, it could
> > stall the datapath, which could result to packet loss.
> >
> > To diminish that, all those flow operations will be recorded and
> > appended to a list. A thread is then introduced to process this list
> > (to do the real flow offloading put/del operations). This could leave
> > the datapath as lightweight as possible.
> 
> Just a general query and not related to this patch specifically, but have you
> given any thought to statistics for the HWOL usecase? Should they be tracked
> in any way for OVS or if tracked by the NIC can they be accessed by OVS?

For Hardware offload, for supported devices, we can aid the help of the NIC for statistics of the flow rules. 
The APIs are already in DPDK. One can associate flow rule with a count action to have full count by the NIC for packet match this flow.
In case HW not supports, then counting can be done by the OVS. 

For Mellanox, flow counters by HW are supported. Not sure about the rest of the vendors. 

Having said that, I think the gain from HW offload for flow mark statistics is low, because the packet is processed by OVS regardless. 
For more advance HWOL like drop action / redirect action it is mandatory. 

> 
> More comments inline below.
> >
> > Signed-off-by: Yuanhan Liu <yliu@fridaylinux.org>
> > Signed-off-by: Shahaf Shuler <shahafs@mellanox.com>
> > ---
> >  lib/dpif-netdev.c | 348
> > ++++++++++++++++++++++++++++++++++++-------------
> >  1 file changed, 258 insertions(+), 90 deletions(-)
> >
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index
> > 7489a2f..8300286
> > 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -345,6 +345,12 @@ enum rxq_cycles_counter_type {
> >      RXQ_N_CYCLES
> >  };
> >
> > +enum {
> > +    DP_NETDEV_FLOW_OFFLOAD_OP_ADD,
> > +    DP_NETDEV_FLOW_OFFLOAD_OP_MOD,
> > +    DP_NETDEV_FLOW_OFFLOAD_OP_DEL,
> > +};
> > +
> >  #define XPS_TIMEOUT 500000LL    /* In microseconds. */
> >
> >  /* Contained by struct dp_netdev_port's 'rxqs' member.  */ @@ -721,6
> > +727,8 @@ static inline bool emc_entry_alive(struct emc_entry *ce);
> > static void emc_clear_entry(struct emc_entry *ce);
> >
> >  static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
> > +static void queue_netdev_flow_del(struct dp_netdev_pmd_thread
> *pmd,
> > +                                  struct dp_netdev_flow *flow);
> >
> >  static void
> >  emc_cache_init(struct emc_cache *flow_cache) @@ -1854,13 +1862,11
> @@
> > struct flow_mark {
> >      struct cmap megaflow_to_mark;
> >      struct cmap mark_to_flow;
> >      struct id_pool *pool;
> > -    struct ovs_mutex mutex;
> >  };
> >
> >  static struct flow_mark flow_mark = {
> >      .megaflow_to_mark = CMAP_INITIALIZER,
> >      .mark_to_flow = CMAP_INITIALIZER,
> > -    .mutex = OVS_MUTEX_INITIALIZER,
> >  };
> >
> >  static uint32_t
> > @@ -2010,7 +2016,7 @@ flow_mark_flush(struct dp_netdev_pmd_thread
> > *pmd)
> >
> >      CMAP_FOR_EACH (flow, mark_node, &flow_mark.mark_to_flow) {
> >          if (flow->pmd_id == pmd->core_id) {
> > -            mark_to_flow_disassociate(pmd, flow);
> > +            queue_netdev_flow_del(pmd, flow);
> >          }
> >      }
> >  }
> > @@ -2032,6 +2038,251 @@ mark_to_flow_find(const struct
> > dp_netdev_pmd_thread *pmd,
> >      return NULL;
> >  }
> >
> > +struct dp_flow_offload_item {
> > +    struct dp_netdev_pmd_thread *pmd;
> > +    struct dp_netdev_flow *flow;
> > +    int op;
> > +    struct match match;
> > +    struct nlattr *actions;
> > +    size_t actions_len;
> > +
> > +    struct ovs_list node;
> > +};
> > +
> > +struct dp_flow_offload {
> > +    struct ovs_mutex mutex;
> > +    struct ovs_list list;
> > +    pthread_cond_t cond;
> > +};
> > +
> > +static struct dp_flow_offload dp_flow_offload = {
> > +    .mutex = OVS_MUTEX_INITIALIZER,
> > +    .list  = OVS_LIST_INITIALIZER(&dp_flow_offload.list),
> > +};
> > +
> > +static struct ovsthread_once offload_thread_once
> > +    = OVSTHREAD_ONCE_INITIALIZER;
> 
> The structs above are declared mid file after the pre-existing
> mark_to_flow_find function.
> 
> It would look cleaner if declared toward the beginning with the enums etc, so
> as to keep functions and structs separate.
> > +
> > +static struct dp_flow_offload_item *
> > +dp_netdev_alloc_flow_offload(struct dp_netdev_pmd_thread *pmd,
> > +                             struct dp_netdev_flow *flow,
> > +                             int op)
> > +{
> > +    struct dp_flow_offload_item *offload;
> > +
> > +    offload = xzalloc(sizeof(*offload));
> > +    offload->pmd = pmd;
> > +    offload->flow = flow;
> > +    offload->op = op;
> > +
> > +    dp_netdev_flow_ref(flow);
> > +    dp_netdev_pmd_try_ref(pmd);
> > +
> > +    return offload;
> > +}
> > +
> > +static void
> > +dp_netdev_free_flow_offload(struct dp_flow_offload_item *offload) {
> > +    dp_netdev_pmd_unref(offload->pmd);
> > +    dp_netdev_flow_unref(offload->flow);
> > +
> > +    free(offload->actions);
> > +    free(offload);
> > +}
> > +
> > +static void
> > +dp_netdev_append_flow_offload(struct dp_flow_offload_item
> *offload) {
> > +    ovs_mutex_lock(&dp_flow_offload.mutex);
> > +    ovs_list_push_back(&dp_flow_offload.list, &offload->node);
> > +    xpthread_cond_signal(&dp_flow_offload.cond);
> > +    ovs_mutex_unlock(&dp_flow_offload.mutex);
> > +}
> > +
> > +static int
> > +dp_netdev_flow_offload_del(struct dp_flow_offload_item *offload) {
> > +    return mark_to_flow_disassociate(offload->pmd, offload->flow); }
> > +
> > +/*
> > + * There are two flow offload operations here: addition and modification.
> > + *
> > + * For flow addition, this function does:
> > + * - allocate a new flow mark id
> > + * - perform hardware flow offload
> > + * - associate the flow mark with flow and mega flow
> > + *
> > + * For flow modification, both flow mark and the associations are
> > +still
> > + * valid, thus only item 2 needed.
> > + */
> > +static int
> > +dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload) {
> > +    struct dp_netdev_port *port;
> > +    struct dp_netdev_pmd_thread *pmd = offload->pmd;
> > +    struct dp_netdev_flow *flow = offload->flow;
> > +    odp_port_t in_port = flow->flow.in_port.odp_port;
> > +    bool modification = offload->op ==
> DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
> > +    struct offload_info info;
> > +    uint32_t mark;
> > +    int ret;
> > +
> > +    if (flow->dead) {
> > +        return -1;
> > +    }
> > +
> > +    if (modification) {
> > +        mark = flow->mark;
> > +        ovs_assert(mark != INVALID_FLOW_MARK);
> > +    } else {
> > +        /*
> > +         * If a mega flow has already been offloaded (from other PMD
> > +         * instances), do not offload it again.
> > +         */
> > +        mark = megaflow_to_mark_find(&flow->mega_ufid);
> > +        if (mark != INVALID_FLOW_MARK) {
> > +            VLOG_DBG("Flow has already been offloaded with mark
> > + %u\n",
> > mark);
> > +            if (flow->mark != INVALID_FLOW_MARK) {
> > +                ovs_assert(flow->mark == mark);
> > +            } else {
> > +                mark_to_flow_associate(mark, flow);
> > +            }
> > +            return 0;
> > +        }
> > +
> > +        mark = flow_mark_alloc();
> > +        if (mark == INVALID_FLOW_MARK) {
> > +            VLOG_ERR("Failed to allocate flow mark!\n");
> > +        }
> > +    }
> > +    info.flow_mark = mark;
> > +
> > +    ovs_mutex_lock(&pmd->dp->port_mutex);
> > +    port = dp_netdev_lookup_port(pmd->dp, in_port);
> > +    if (!port) {
> > +        ovs_mutex_unlock(&pmd->dp->port_mutex);
> > +        return -1;
> > +    }
> > +    ret = netdev_flow_put(port->netdev, &offload->match,
> > +                          CONST_CAST(struct nlattr *, offload->actions),
> > +                          offload->actions_len, &flow->mega_ufid, &info,
> > +                          NULL);
> > +    ovs_mutex_unlock(&pmd->dp->port_mutex);
> > +
> > +    if (ret) {
> > +        if (!modification) {
> > +            flow_mark_free(mark);
> > +        } else {
> > +            mark_to_flow_disassociate(pmd, flow);
> > +        }
> > +        return -1;
> > +    }
> > +
> > +    if (!modification) {
> > +        megaflow_to_mark_associate(&flow->mega_ufid, mark);
> > +        mark_to_flow_associate(mark, flow);
> > +    }
> > +
> > +    return 0;
> > +}
> > +
> > +static void *
> > +dp_netdev_flow_offload_main(void *data OVS_UNUSED) {
> > +    struct dp_flow_offload_item *offload;
> > +    struct ovs_list *list;
> > +    const char *op;
> > +    int ret;
> > +
> > +    for (;;) {
> > +        ovs_mutex_lock(&dp_flow_offload.mutex);
> > +        if (ovs_list_is_empty(&dp_flow_offload.list)) {
> > +            ovsrcu_quiesce_start();
> > +            ovs_mutex_cond_wait(&dp_flow_offload.cond,
> > +                                &dp_flow_offload.mutex);
> > +        }
> > +        list = ovs_list_pop_front(&dp_flow_offload.list);
> > +        offload = CONTAINER_OF(list, struct dp_flow_offload_item, node);
> > +        ovs_mutex_unlock(&dp_flow_offload.mutex);
> > +
> > +        switch (offload->op) {
> > +        case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
> > +            op = "add";
> > +            ret = dp_netdev_flow_offload_put(offload);
> > +            break;
> > +        case DP_NETDEV_FLOW_OFFLOAD_OP_MOD:
> > +            op = "modify";
> > +            ret = dp_netdev_flow_offload_put(offload);
> > +            break;
> > +        case DP_NETDEV_FLOW_OFFLOAD_OP_DEL:
> > +            op = "delete";
> > +            ret = dp_netdev_flow_offload_del(offload);
> > +            break;
> > +        default:
> > +            OVS_NOT_REACHED();
> > +        }
> > +
> > +        VLOG_DBG("%s to %s netdev flow\n",
> > +                 ret == 0 ? "succeed" : "failed", op);
> > +        dp_netdev_free_flow_offload(offload);
> > +    }
> > +
> > +    return NULL;
> > +}
> > +
> > +static void
> > +queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
> > +                      struct dp_netdev_flow *flow) {
> > +    struct dp_flow_offload_item *offload;
> > +
> > +    if (ovsthread_once_start(&offload_thread_once)) {
> > +        xpthread_cond_init(&dp_flow_offload.cond, NULL);
> > +        ovs_thread_create("dp_netdev_flow_offload",
> > +                          dp_netdev_flow_offload_main, NULL);
> > +        ovsthread_once_done(&offload_thread_once);
> > +    }
> > +
> > +    offload = dp_netdev_alloc_flow_offload(pmd, flow,
> > +
> > DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
> > +    dp_netdev_append_flow_offload(offload);
> > +}
> > +
> > +static void
> > +queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
> > +                      struct dp_netdev_flow *flow, struct match *match,
> > +                      const struct nlattr *actions, size_t
> > +actions_len) {
> > +    struct dp_flow_offload_item *offload;
> > +    int op;
> > +
> > +    if (!netdev_is_flow_api_enabled()) {
> > +        return;
> > +    }
> > +
> > +    if (ovsthread_once_start(&offload_thread_once)) {
> > +        xpthread_cond_init(&dp_flow_offload.cond, NULL);
> > +        ovs_thread_create("dp_netdev_flow_offload",
> > +                          dp_netdev_flow_offload_main, NULL);
> > +        ovsthread_once_done(&offload_thread_once);
> > +    }
> > +
> > +    if (flow->mark != INVALID_FLOW_MARK) {
> > +        op = DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
> > +    } else {
> > +        op = DP_NETDEV_FLOW_OFFLOAD_OP_ADD;
> > +    }
> > +    offload = dp_netdev_alloc_flow_offload(pmd, flow, op);
> > +    offload->match = *match;
> > +    offload->actions = xmalloc(actions_len);
> > +    memcpy(offload->actions, actions, actions_len);
> > +    offload->actions_len = actions_len;
> > +
> > +    dp_netdev_append_flow_offload(offload);
> > +}
> > +
> >  static void
> >  dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
> >                            struct dp_netdev_flow *flow) @@ -2046,7
> > +2297,7 @@ dp_netdev_pmd_remove_flow(struct
> dp_netdev_pmd_thread *pmd,
> >      dpcls_remove(cls, &flow->cr);
> >      cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow-
> > >ufid));
> >      if (flow->mark != INVALID_FLOW_MARK) {
> > -        mark_to_flow_disassociate(pmd, flow);
> > +        queue_netdev_flow_del(pmd, flow);
> >      }
> >      flow->dead = true;
> >
> > @@ -2627,88 +2878,6 @@ out:
> >      return error;
> >  }
> >
> > -/*
> > - * There are two flow offload operations here: addition and modification.
> > - *
> > - * For flow addition, this function does:
> > - * - allocate a new flow mark id
> > - * - perform hardware flow offload
> > - * - associate the flow mark with flow and mega flow
> > - *
> > - * For flow modification, both flow mark and the associations are
> > still
> > - * valid, thus only item 2 needed.
> > - */
> > -static void
> > -try_netdev_flow_put(struct dp_netdev_pmd_thread *pmd, odp_port_t
> in_port,
> > -                    struct dp_netdev_flow *flow, struct match *match,
> > -                    const struct nlattr *actions, size_t actions_len)
> > -{
> > -    struct offload_info info;
> > -    struct dp_netdev_port *port;
> > -    bool modification = flow->mark != INVALID_FLOW_MARK;
> > -    const char *op = modification ? "modify" : "add";
> > -    uint32_t mark;
> > -    int ret;
> > -
> > -    ovs_mutex_lock(&flow_mark.mutex);
> > -
> > -    if (modification) {
> > -        mark = flow->mark;
> > -    } else {
> > -        if (!netdev_is_flow_api_enabled()) {
> > -            goto out;
> > -        }
> > -
> > -        /*
> > -         * If a mega flow has already been offloaded (from other PMD
> > -         * instances), do not offload it again.
> > -         */
> > -        mark = megaflow_to_mark_find(&flow->mega_ufid);
> > -        if (mark != INVALID_FLOW_MARK) {
> > -            VLOG_DBG("Flow has already been offloaded with mark %u\n",
> > mark);
> > -            mark_to_flow_associate(mark, flow);
> > -            goto out;
> > -        }
> > -
> > -        mark = flow_mark_alloc();
> > -        if (mark == INVALID_FLOW_MARK) {
> > -            VLOG_ERR("Failed to allocate flow mark!\n");
> > -            goto out;
> > -        }
> > -    }
> > -    info.flow_mark = mark;
> > -
> > -    ovs_mutex_lock(&pmd->dp->port_mutex);
> > -    port = dp_netdev_lookup_port(pmd->dp, in_port);
> > -    if (!port) {
> > -        ovs_mutex_unlock(&pmd->dp->port_mutex);
> > -        goto out;
> > -    }
> > -    ret = netdev_flow_put(port->netdev, match,
> > -                          CONST_CAST(struct nlattr *, actions),
> > -                          actions_len, &flow->mega_ufid, &info, NULL);
> > -    ovs_mutex_unlock(&pmd->dp->port_mutex);
> > -
> > -    if (ret) {
> > -        VLOG_ERR("Failed to %s netdev flow with mark %u\n", op, mark);
> > -        if (!modification) {
> > -            flow_mark_free(mark);
> > -        } else {
> > -            mark_to_flow_disassociate(pmd, flow);
> > -        }
> > -        goto out;
> > -    }
> > -
> > -    if (!modification) {
> > -        megaflow_to_mark_associate(&flow->mega_ufid, mark);
> > -        mark_to_flow_associate(mark, flow);
> > -    }
> > -    VLOG_DBG("Succeed to %s netdev flow with mark %u\n", op, mark);
> > -
> > -out:
> > -    ovs_mutex_unlock(&flow_mark.mutex);
> > -}
> > -
> >  static void
> >  dp_netdev_get_mega_ufid(const struct match *match, ovs_u128
> > *mega_ufid) { @@ -2774,7 +2943,7 @@ dp_netdev_flow_add(struct
> > dp_netdev_pmd_thread *pmd,
> >      cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *,
> > &flow-
> > >node),
> >                  dp_netdev_flow_hash(&flow->ufid));
> >
> > -    try_netdev_flow_put(pmd, in_port, flow, match, actions, actions_len);
> > +    queue_netdev_flow_put(pmd, flow, match, actions, actions_len);
> >
> >      if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
> >          struct ds ds = DS_EMPTY_INITIALIZER; @@ -2856,7 +3025,6 @@
> > flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
> >          if (put->flags & DPIF_FP_MODIFY) {
> >              struct dp_netdev_actions *new_actions;
> >              struct dp_netdev_actions *old_actions;
> > -            odp_port_t in_port = netdev_flow->flow.in_port.odp_port;
> >
> >              new_actions = dp_netdev_actions_create(put->actions,
> >                                                     put->actions_len);
> > @@
> > -2864,8 +3032,8 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread
> *pmd,
> >              old_actions = dp_netdev_flow_get_actions(netdev_flow);
> >              ovsrcu_set(&netdev_flow->actions, new_actions);
> >
> > -            try_netdev_flow_put(pmd, in_port, netdev_flow, match,
> > -                                put->actions, put->actions_len);
> > +            queue_netdev_flow_put(pmd, netdev_flow, match,
> > +                                  put->actions, put->actions_len);
> >
> >              if (stats) {
> >                  get_dpif_flow_stats(netdev_flow, stats);
> > --
> > 2.7.4

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 7489a2f..8300286 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -345,6 +345,12 @@  enum rxq_cycles_counter_type {
     RXQ_N_CYCLES
 };
 
+enum {
+    DP_NETDEV_FLOW_OFFLOAD_OP_ADD,
+    DP_NETDEV_FLOW_OFFLOAD_OP_MOD,
+    DP_NETDEV_FLOW_OFFLOAD_OP_DEL,
+};
+
 #define XPS_TIMEOUT 500000LL    /* In microseconds. */
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
@@ -721,6 +727,8 @@  static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
 
 static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
+static void queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
+                                  struct dp_netdev_flow *flow);
 
 static void
 emc_cache_init(struct emc_cache *flow_cache)
@@ -1854,13 +1862,11 @@  struct flow_mark {
     struct cmap megaflow_to_mark;
     struct cmap mark_to_flow;
     struct id_pool *pool;
-    struct ovs_mutex mutex;
 };
 
 static struct flow_mark flow_mark = {
     .megaflow_to_mark = CMAP_INITIALIZER,
     .mark_to_flow = CMAP_INITIALIZER,
-    .mutex = OVS_MUTEX_INITIALIZER,
 };
 
 static uint32_t
@@ -2010,7 +2016,7 @@  flow_mark_flush(struct dp_netdev_pmd_thread *pmd)
 
     CMAP_FOR_EACH (flow, mark_node, &flow_mark.mark_to_flow) {
         if (flow->pmd_id == pmd->core_id) {
-            mark_to_flow_disassociate(pmd, flow);
+            queue_netdev_flow_del(pmd, flow);
         }
     }
 }
@@ -2032,6 +2038,251 @@  mark_to_flow_find(const struct dp_netdev_pmd_thread *pmd,
     return NULL;
 }
 
+struct dp_flow_offload_item {
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_flow *flow;
+    int op;
+    struct match match;
+    struct nlattr *actions;
+    size_t actions_len;
+
+    struct ovs_list node;
+};
+
+struct dp_flow_offload {
+    struct ovs_mutex mutex;
+    struct ovs_list list;
+    pthread_cond_t cond;
+};
+
+static struct dp_flow_offload dp_flow_offload = {
+    .mutex = OVS_MUTEX_INITIALIZER,
+    .list  = OVS_LIST_INITIALIZER(&dp_flow_offload.list),
+};
+
+static struct ovsthread_once offload_thread_once
+    = OVSTHREAD_ONCE_INITIALIZER;
+
+static struct dp_flow_offload_item *
+dp_netdev_alloc_flow_offload(struct dp_netdev_pmd_thread *pmd,
+                             struct dp_netdev_flow *flow,
+                             int op)
+{
+    struct dp_flow_offload_item *offload;
+
+    offload = xzalloc(sizeof(*offload));
+    offload->pmd = pmd;
+    offload->flow = flow;
+    offload->op = op;
+
+    dp_netdev_flow_ref(flow);
+    dp_netdev_pmd_try_ref(pmd);
+
+    return offload;
+}
+
+static void
+dp_netdev_free_flow_offload(struct dp_flow_offload_item *offload)
+{
+    dp_netdev_pmd_unref(offload->pmd);
+    dp_netdev_flow_unref(offload->flow);
+
+    free(offload->actions);
+    free(offload);
+}
+
+static void
+dp_netdev_append_flow_offload(struct dp_flow_offload_item *offload)
+{
+    ovs_mutex_lock(&dp_flow_offload.mutex);
+    ovs_list_push_back(&dp_flow_offload.list, &offload->node);
+    xpthread_cond_signal(&dp_flow_offload.cond);
+    ovs_mutex_unlock(&dp_flow_offload.mutex);
+}
+
+static int
+dp_netdev_flow_offload_del(struct dp_flow_offload_item *offload)
+{
+    return mark_to_flow_disassociate(offload->pmd, offload->flow);
+}
+
+/*
+ * There are two flow offload operations here: addition and modification.
+ *
+ * For flow addition, this function does:
+ * - allocate a new flow mark id
+ * - perform hardware flow offload
+ * - associate the flow mark with flow and mega flow
+ *
+ * For flow modification, both flow mark and the associations are still
+ * valid, thus only item 2 needed.
+ */
+static int
+dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload)
+{
+    struct dp_netdev_port *port;
+    struct dp_netdev_pmd_thread *pmd = offload->pmd;
+    struct dp_netdev_flow *flow = offload->flow;
+    odp_port_t in_port = flow->flow.in_port.odp_port;
+    bool modification = offload->op == DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
+    struct offload_info info;
+    uint32_t mark;
+    int ret;
+
+    if (flow->dead) {
+        return -1;
+    }
+
+    if (modification) {
+        mark = flow->mark;
+        ovs_assert(mark != INVALID_FLOW_MARK);
+    } else {
+        /*
+         * If a mega flow has already been offloaded (from other PMD
+         * instances), do not offload it again.
+         */
+        mark = megaflow_to_mark_find(&flow->mega_ufid);
+        if (mark != INVALID_FLOW_MARK) {
+            VLOG_DBG("Flow has already been offloaded with mark %u\n", mark);
+            if (flow->mark != INVALID_FLOW_MARK) {
+                ovs_assert(flow->mark == mark);
+            } else {
+                mark_to_flow_associate(mark, flow);
+            }
+            return 0;
+        }
+
+        mark = flow_mark_alloc();
+        if (mark == INVALID_FLOW_MARK) {
+            VLOG_ERR("Failed to allocate flow mark!\n");
+        }
+    }
+    info.flow_mark = mark;
+
+    ovs_mutex_lock(&pmd->dp->port_mutex);
+    port = dp_netdev_lookup_port(pmd->dp, in_port);
+    if (!port) {
+        ovs_mutex_unlock(&pmd->dp->port_mutex);
+        return -1;
+    }
+    ret = netdev_flow_put(port->netdev, &offload->match,
+                          CONST_CAST(struct nlattr *, offload->actions),
+                          offload->actions_len, &flow->mega_ufid, &info,
+                          NULL);
+    ovs_mutex_unlock(&pmd->dp->port_mutex);
+
+    if (ret) {
+        if (!modification) {
+            flow_mark_free(mark);
+        } else {
+            mark_to_flow_disassociate(pmd, flow);
+        }
+        return -1;
+    }
+
+    if (!modification) {
+        megaflow_to_mark_associate(&flow->mega_ufid, mark);
+        mark_to_flow_associate(mark, flow);
+    }
+
+    return 0;
+}
+
+static void *
+dp_netdev_flow_offload_main(void *data OVS_UNUSED)
+{
+    struct dp_flow_offload_item *offload;
+    struct ovs_list *list;
+    const char *op;
+    int ret;
+
+    for (;;) {
+        ovs_mutex_lock(&dp_flow_offload.mutex);
+        if (ovs_list_is_empty(&dp_flow_offload.list)) {
+            ovsrcu_quiesce_start();
+            ovs_mutex_cond_wait(&dp_flow_offload.cond,
+                                &dp_flow_offload.mutex);
+        }
+        list = ovs_list_pop_front(&dp_flow_offload.list);
+        offload = CONTAINER_OF(list, struct dp_flow_offload_item, node);
+        ovs_mutex_unlock(&dp_flow_offload.mutex);
+
+        switch (offload->op) {
+        case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
+            op = "add";
+            ret = dp_netdev_flow_offload_put(offload);
+            break;
+        case DP_NETDEV_FLOW_OFFLOAD_OP_MOD:
+            op = "modify";
+            ret = dp_netdev_flow_offload_put(offload);
+            break;
+        case DP_NETDEV_FLOW_OFFLOAD_OP_DEL:
+            op = "delete";
+            ret = dp_netdev_flow_offload_del(offload);
+            break;
+        default:
+            OVS_NOT_REACHED();
+        }
+
+        VLOG_DBG("%s to %s netdev flow\n",
+                 ret == 0 ? "succeed" : "failed", op);
+        dp_netdev_free_flow_offload(offload);
+    }
+
+    return NULL;
+}
+
+static void
+queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_flow *flow)
+{
+    struct dp_flow_offload_item *offload;
+
+    if (ovsthread_once_start(&offload_thread_once)) {
+        xpthread_cond_init(&dp_flow_offload.cond, NULL);
+        ovs_thread_create("dp_netdev_flow_offload",
+                          dp_netdev_flow_offload_main, NULL);
+        ovsthread_once_done(&offload_thread_once);
+    }
+
+    offload = dp_netdev_alloc_flow_offload(pmd, flow,
+                                           DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
+    dp_netdev_append_flow_offload(offload);
+}
+
+static void
+queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_flow *flow, struct match *match,
+                      const struct nlattr *actions, size_t actions_len)
+{
+    struct dp_flow_offload_item *offload;
+    int op;
+
+    if (!netdev_is_flow_api_enabled()) {
+        return;
+    }
+
+    if (ovsthread_once_start(&offload_thread_once)) {
+        xpthread_cond_init(&dp_flow_offload.cond, NULL);
+        ovs_thread_create("dp_netdev_flow_offload",
+                          dp_netdev_flow_offload_main, NULL);
+        ovsthread_once_done(&offload_thread_once);
+    }
+
+    if (flow->mark != INVALID_FLOW_MARK) {
+        op = DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
+    } else {
+        op = DP_NETDEV_FLOW_OFFLOAD_OP_ADD;
+    }
+    offload = dp_netdev_alloc_flow_offload(pmd, flow, op);
+    offload->match = *match;
+    offload->actions = xmalloc(actions_len);
+    memcpy(offload->actions, actions, actions_len);
+    offload->actions_len = actions_len;
+
+    dp_netdev_append_flow_offload(offload);
+}
+
 static void
 dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
                           struct dp_netdev_flow *flow)
@@ -2046,7 +2297,7 @@  dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
     dpcls_remove(cls, &flow->cr);
     cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
     if (flow->mark != INVALID_FLOW_MARK) {
-        mark_to_flow_disassociate(pmd, flow);
+        queue_netdev_flow_del(pmd, flow);
     }
     flow->dead = true;
 
@@ -2627,88 +2878,6 @@  out:
     return error;
 }
 
-/*
- * There are two flow offload operations here: addition and modification.
- *
- * For flow addition, this function does:
- * - allocate a new flow mark id
- * - perform hardware flow offload
- * - associate the flow mark with flow and mega flow
- *
- * For flow modification, both flow mark and the associations are still
- * valid, thus only item 2 needed.
- */
-static void
-try_netdev_flow_put(struct dp_netdev_pmd_thread *pmd, odp_port_t in_port,
-                    struct dp_netdev_flow *flow, struct match *match,
-                    const struct nlattr *actions, size_t actions_len)
-{
-    struct offload_info info;
-    struct dp_netdev_port *port;
-    bool modification = flow->mark != INVALID_FLOW_MARK;
-    const char *op = modification ? "modify" : "add";
-    uint32_t mark;
-    int ret;
-
-    ovs_mutex_lock(&flow_mark.mutex);
-
-    if (modification) {
-        mark = flow->mark;
-    } else {
-        if (!netdev_is_flow_api_enabled()) {
-            goto out;
-        }
-
-        /*
-         * If a mega flow has already been offloaded (from other PMD
-         * instances), do not offload it again.
-         */
-        mark = megaflow_to_mark_find(&flow->mega_ufid);
-        if (mark != INVALID_FLOW_MARK) {
-            VLOG_DBG("Flow has already been offloaded with mark %u\n", mark);
-            mark_to_flow_associate(mark, flow);
-            goto out;
-        }
-
-        mark = flow_mark_alloc();
-        if (mark == INVALID_FLOW_MARK) {
-            VLOG_ERR("Failed to allocate flow mark!\n");
-            goto out;
-        }
-    }
-    info.flow_mark = mark;
-
-    ovs_mutex_lock(&pmd->dp->port_mutex);
-    port = dp_netdev_lookup_port(pmd->dp, in_port);
-    if (!port) {
-        ovs_mutex_unlock(&pmd->dp->port_mutex);
-        goto out;
-    }
-    ret = netdev_flow_put(port->netdev, match,
-                          CONST_CAST(struct nlattr *, actions),
-                          actions_len, &flow->mega_ufid, &info, NULL);
-    ovs_mutex_unlock(&pmd->dp->port_mutex);
-
-    if (ret) {
-        VLOG_ERR("Failed to %s netdev flow with mark %u\n", op, mark);
-        if (!modification) {
-            flow_mark_free(mark);
-        } else {
-            mark_to_flow_disassociate(pmd, flow);
-        }
-        goto out;
-    }
-
-    if (!modification) {
-        megaflow_to_mark_associate(&flow->mega_ufid, mark);
-        mark_to_flow_associate(mark, flow);
-    }
-    VLOG_DBG("Succeed to %s netdev flow with mark %u\n", op, mark);
-
-out:
-    ovs_mutex_unlock(&flow_mark.mutex);
-}
-
 static void
 dp_netdev_get_mega_ufid(const struct match *match, ovs_u128 *mega_ufid)
 {
@@ -2774,7 +2943,7 @@  dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
                 dp_netdev_flow_hash(&flow->ufid));
 
-    try_netdev_flow_put(pmd, in_port, flow, match, actions, actions_len);
+    queue_netdev_flow_put(pmd, flow, match, actions, actions_len);
 
     if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
         struct ds ds = DS_EMPTY_INITIALIZER;
@@ -2856,7 +3025,6 @@  flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
         if (put->flags & DPIF_FP_MODIFY) {
             struct dp_netdev_actions *new_actions;
             struct dp_netdev_actions *old_actions;
-            odp_port_t in_port = netdev_flow->flow.in_port.odp_port;
 
             new_actions = dp_netdev_actions_create(put->actions,
                                                    put->actions_len);
@@ -2864,8 +3032,8 @@  flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
             old_actions = dp_netdev_flow_get_actions(netdev_flow);
             ovsrcu_set(&netdev_flow->actions, new_actions);
 
-            try_netdev_flow_put(pmd, in_port, netdev_flow, match,
-                                put->actions, put->actions_len);
+            queue_netdev_flow_put(pmd, netdev_flow, match,
+                                  put->actions, put->actions_len);
 
             if (stats) {
                 get_dpif_flow_stats(netdev_flow, stats);