diff mbox

[ovs-dev,ovs,V7,08/24] dpif-netlink: Dump netdevs flows on flow dump

Message ID 1491570791-55393-9-git-send-email-roid@mellanox.com
State Changes Requested
Headers show

Commit Message

Roi Dayan April 7, 2017, 1:12 p.m. UTC
From: Paul Blakey <paulb@mellanox.com>

While dumping flows, dump flows that were offloaded to
netdev and parse them back to dpif flow.

Signed-off-by: Paul Blakey <paulb@mellanox.com>
Reviewed-by: Roi Dayan <roid@mellanox.com>
Reviewed-by: Simon Horman <simon.horman@netronome.com>
---
 lib/dpif-netlink.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev.c       |  32 ++++++++++
 2 files changed, 210 insertions(+), 1 deletion(-)

Comments

Joe Stringer April 14, 2017, 12:51 a.m. UTC | #1
On 7 April 2017 at 06:12, Roi Dayan <roid@mellanox.com> wrote:
> From: Paul Blakey <paulb@mellanox.com>
>
> While dumping flows, dump flows that were offloaded to
> netdev and parse them back to dpif flow.
>
> Signed-off-by: Paul Blakey <paulb@mellanox.com>
> Reviewed-by: Roi Dayan <roid@mellanox.com>
> Reviewed-by: Simon Horman <simon.horman@netronome.com>
> ---
>  lib/dpif-netlink.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  lib/netdev.c       |  32 ++++++++++
>  2 files changed, 210 insertions(+), 1 deletion(-)
>
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 02dd6c2..4e3bc8c 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -38,6 +38,7 @@
>  #include "flow.h"
>  #include "fat-rwlock.h"
>  #include "netdev.h"
> +#include "netdev-provider.h"
>  #include "netdev-linux.h"
>  #include "netdev-vport.h"
>  #include "netlink-conntrack.h"
> @@ -55,6 +56,7 @@
>  #include "unaligned.h"
>  #include "util.h"
>  #include "openvswitch/vlog.h"
> +#include "openvswitch/match.h"
>
>  VLOG_DEFINE_THIS_MODULE(dpif_netlink);
>  #ifdef _WIN32
> @@ -72,6 +74,8 @@ enum { MAX_PORTS = USHRT_MAX };
>   * missing if we have old headers. */
>  #define ETH_FLAG_LRO      (1 << 15)    /* LRO is enabled */
>
> +#define FLOW_DUMP_MAX_BATCH 50
> +
>  struct dpif_netlink_dp {
>      /* Generic Netlink header. */
>      uint8_t cmd;
> @@ -1373,6 +1377,10 @@ struct dpif_netlink_flow_dump {
>      struct dpif_flow_dump up;
>      struct nl_dump nl_dump;
>      atomic_int status;
> +    struct netdev_flow_dump **netdev_dumps;
> +    int netdev_dumps_num;                    /* Number of netdev_flow_dumps */
> +    struct ovs_mutex netdev_lock;            /* Guards the following. */
> +    int netdev_current_dump OVS_GUARDED;     /* Shared current dump */
>  };
>
>  static struct dpif_netlink_flow_dump *
> @@ -1381,6 +1389,26 @@ dpif_netlink_flow_dump_cast(struct dpif_flow_dump *dump)
>      return CONTAINER_OF(dump, struct dpif_netlink_flow_dump, up);
>  }
>
> +static void
> +start_netdev_dump(const struct dpif *dpif_,
> +                  struct dpif_netlink_flow_dump *dump)
> +{
> +    ovs_mutex_init(&dump->netdev_lock);
> +
> +    if (!netdev_flow_api_enabled) {
> +        dump->netdev_dumps_num = 0;
> +        dump->netdev_dumps = NULL;
> +        return;
> +    }
> +
> +    ovs_mutex_lock(&dump->netdev_lock);
> +    dump->netdev_current_dump = 0;
> +    dump->netdev_dumps
> +        = netdev_ports_flow_dumps_create(DPIF_HMAP_KEY(dpif_),
> +                                         &dump->netdev_dumps_num);
> +    ovs_mutex_unlock(&dump->netdev_lock);
> +}
> +
>  static struct dpif_flow_dump *
>  dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
>  {
> @@ -1405,6 +1433,8 @@ dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
>      atomic_init(&dump->status, 0);
>      dump->up.terse = terse;
>
> +    start_netdev_dump(dpif_, dump);
> +
>      return &dump->up;
>  }
>
> @@ -1415,6 +1445,16 @@ dpif_netlink_flow_dump_destroy(struct dpif_flow_dump *dump_)
>      unsigned int nl_status = nl_dump_done(&dump->nl_dump);
>      int dump_status;
>
> +    for (int i = 0; i < dump->netdev_dumps_num; i++) {
> +        int err = netdev_flow_dump_destroy(dump->netdev_dumps[i]);
> +        if (err != 0 && err != EOPNOTSUPP) {
> +            VLOG_ERR("failed dumping netdev: %s", ovs_strerror(err));
> +        }
> +    }
> +
> +    free(dump->netdev_dumps);
> +    ovs_mutex_destroy(&dump->netdev_lock);
> +
>      /* No other thread has access to 'dump' at this point. */
>      atomic_read_relaxed(&dump->status, &dump_status);
>      free(dump);
> @@ -1428,6 +1468,13 @@ struct dpif_netlink_flow_dump_thread {
>      struct dpif_flow_stats stats;
>      struct ofpbuf nl_flows;     /* Always used to store flows. */
>      struct ofpbuf *nl_actions;  /* Used if kernel does not supply actions. */
> +    int netdev_dump_idx;        /* This thread current netdev dump index */
> +    bool netdev_done;           /* If we are finished dumping netdevs */
> +
> +    /* (Key/Mask/Actions) Buffers for netdev dumping */
> +    struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
> +    struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
> +    struct odputil_keybuf actbuf[FLOW_DUMP_MAX_BATCH];
>  };
>
>  static struct dpif_netlink_flow_dump_thread *
> @@ -1447,6 +1494,8 @@ dpif_netlink_flow_dump_thread_create(struct dpif_flow_dump *dump_)
>      thread->dump = dump;
>      ofpbuf_init(&thread->nl_flows, NL_DUMP_BUFSIZE);
>      thread->nl_actions = NULL;
> +    thread->netdev_dump_idx = 0;
> +    thread->netdev_done = !(thread->netdev_dump_idx < dump->netdev_dumps_num);
>
>      return &thread->up;
>  }
> @@ -1484,6 +1533,96 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
>      dpif_netlink_flow_get_stats(datapath_flow, &dpif_flow->stats);
>  }
>
> +/* The design is such that all threads are working together on the first dump
> + * to the last, in order (at first they all on dump 0).
> + * When the first thread finds that the given dump is finished,
> + * they all move to the next. If two or more threads find the same dump
> + * is finished at the same time, the first one will advance the shared
> + * netdev_current_dump and the others will catch up. */
> +static void
> +dpif_netlink_advance_netdev_dump(struct dpif_netlink_flow_dump_thread *thread)
> +{
> +    struct dpif_netlink_flow_dump *dump = thread->dump;
> +
> +    ovs_mutex_lock(&dump->netdev_lock);
> +    /* if we haven't finished (dumped everything) */
> +    if (dump->netdev_current_dump < dump->netdev_dumps_num) {
> +        /* if we are the first to find that current dump is finished
> +         * advance it. */
> +        if (thread->netdev_dump_idx == dump->netdev_current_dump) {
> +            thread->netdev_dump_idx = ++dump->netdev_current_dump;
> +            /* did we just finish the last dump? done. */
> +            if (dump->netdev_current_dump == dump->netdev_dumps_num) {
> +                thread->netdev_done = true;
> +            }
> +        } else {
> +            /* otherwise, we are behind, catch up */
> +            thread->netdev_dump_idx = dump->netdev_current_dump;
> +        }
> +    } else {
> +        /* some other thread finished */
> +        thread->netdev_done = true;
> +    }
> +    ovs_mutex_unlock(&dump->netdev_lock);
> +}
> +
> +static struct odp_support netdev_flow_support = {
> +    .max_mpls_depth = SIZE_MAX,
> +    .recirc = false,
> +    .ct_state = false,
> +    .ct_zone = false,
> +    .ct_mark = false,
> +    .ct_label = false,
> +};

Perhaps we should add a comment above this structure to state how it's
only used for translating netdev flows into dpif flows, and current
implementations of the netdev flow interface don't support these
features so it's not relevant to serialize those fields into the
nlattr-formatted buffers in the function below?

> +
> +static int
> +dpif_netlink_netdev_match_to_dpif_flow(struct match *match,
> +                                       struct ofpbuf *key_buf,
> +                                       struct ofpbuf *mask_buf,
> +                                       struct nlattr *actions,
> +                                       struct dpif_flow_stats *stats,
> +                                       ovs_u128 *ufid,
> +                                       struct dpif_flow *flow,
> +                                       bool terse OVS_UNUSED)
> +{
> +
> +    struct odp_flow_key_parms odp_parms = {
> +        .flow = &match->flow,
> +        .mask = &match->wc.masks,
> +        .support = netdev_flow_support,
> +    };
> +    size_t offset;
> +
> +    memset(flow, 0, sizeof *flow);
> +
> +    /* Key */
> +    offset = key_buf->size;
> +    flow->key = ofpbuf_tail(key_buf);
> +    odp_flow_key_from_flow(&odp_parms, key_buf);
> +    flow->key_len = key_buf->size - offset;
> +
> +    /* Mask */
> +    offset = mask_buf->size;
> +    flow->mask = ofpbuf_tail(mask_buf);
> +    odp_parms.key_buf = key_buf;
> +    odp_flow_key_from_mask(&odp_parms, mask_buf);
> +    flow->mask_len = mask_buf->size - offset;
> +
> +    /* Actions */
> +    flow->actions = nl_attr_get(actions);
> +    flow->actions_len = nl_attr_get_size(actions);
> +
> +    /* Stats */
> +    memcpy(&flow->stats, stats, sizeof *stats);
> +
> +    /* UFID */
> +    flow->ufid_present = true;
> +    flow->ufid = *ufid;
> +
> +    flow->pmd_id = PMD_ID_NULL;
> +    return 0;
> +}
> +
>  static int
>  dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
>                              struct dpif_flow *flows, int max_flows)
> @@ -1492,14 +1631,52 @@ dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
>          = dpif_netlink_flow_dump_thread_cast(thread_);
>      struct dpif_netlink_flow_dump *dump = thread->dump;
>      struct dpif_netlink *dpif = dpif_netlink_cast(thread->up.dpif);
> +    int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);

We might consider just overriding the value of max_flows just after
initializing n_flows below instead of having another variable named
"flow_limit"; "flow_limit" in other parts of the codebase implies
something else.

>      int n_flows;
>
>      ofpbuf_delete(thread->nl_actions);
>      thread->nl_actions = NULL;
>
>      n_flows = 0;
> +
> +    while (!thread->netdev_done && n_flows < flow_limit) {
> +        struct odputil_keybuf *maskbuf = &thread->maskbuf[n_flows];
> +        struct odputil_keybuf *keybuf = &thread->keybuf[n_flows];
> +        struct odputil_keybuf *actbuf = &thread->actbuf[n_flows];
> +        struct ofpbuf key, mask, act;
> +        struct dpif_flow *f = &flows[n_flows];
> +        int cur = thread->netdev_dump_idx;
> +        struct netdev_flow_dump *netdev_dump = dump->netdev_dumps[cur];
> +        struct match match;
> +        struct nlattr *actions;
> +        struct dpif_flow_stats stats;
> +        ovs_u128 ufid;
> +        bool has_next;
> +
> +        ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
> +        ofpbuf_use_stack(&act, actbuf, sizeof *actbuf);
> +        ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
> +        has_next = netdev_flow_dump_next(netdev_dump, &match,
> +                                        &actions, &stats,
> +                                        &ufid,
> +                                        &thread->nl_flows,
> +                                        &act);
> +        if (has_next) {
> +            dpif_netlink_netdev_match_to_dpif_flow(&match,
> +                                                   &key, &mask,
> +                                                   actions,
> +                                                   &stats,
> +                                                   &ufid,
> +                                                   f,
> +                                                   dump->up.terse);
> +            n_flows++;
> +        } else {
> +            dpif_netlink_advance_netdev_dump(thread);
> +        }
> +    }
> +
>      while (!n_flows
> -           || (n_flows < max_flows && thread->nl_flows.size)) {
> +           || (n_flows < flow_limit && thread->nl_flows.size)) {
>          struct dpif_netlink_flow datapath_flow;
>          struct ofpbuf nl_flow;
>          int error;
> diff --git a/lib/netdev.c b/lib/netdev.c
> index c3261fd..8093508 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -2239,6 +2239,38 @@ netdev_ports_flow_flush(const void *obj)
>      }
>  }
>
> +struct netdev_flow_dump **
> +netdev_ports_flow_dumps_create(const void *obj, int *ports)
> +{
> +    struct port_to_netdev_data *data;
> +    struct netdev_flow_dump **dumps;
> +    int count = 0;
> +    int i = 0;
> +
> +    HMAP_FOR_EACH(data, node, &port_to_netdev) {
> +        if (data->obj == obj) {
> +            count++;
> +        }
> +    }
> +
> +    dumps = count ? xzalloc(sizeof *dumps * count) : NULL;

If this function took a 'struct netdev_ports' as per my earlier
suggestion, the above several lines could be a call to hmap_count().

> +
> +    HMAP_FOR_EACH(data, node, &port_to_netdev) {
> +        if (data->obj == obj) {
> +            int err = netdev_flow_dump_create(data->netdev, &dumps[i]);
> +            if (err) {
> +                continue;
> +            }
> +
> +            dumps[i]->port = data->dpif_port.port_no;
> +            i++;
> +        }
> +    }
> +
> +    *ports = i;
> +    return dumps;
> +}
> +
>  bool netdev_flow_api_enabled = false;
>
>  #ifdef __linux__
> --
> 2.7.4
>
Roi Dayan April 19, 2017, 2:38 p.m. UTC | #2
On 14/04/2017 03:51, Joe Stringer wrote:
> On 7 April 2017 at 06:12, Roi Dayan <roid@mellanox.com> wrote:
>> From: Paul Blakey <paulb@mellanox.com>
>>
>> While dumping flows, dump flows that were offloaded to
>> netdev and parse them back to dpif flow.
>>
>> Signed-off-by: Paul Blakey <paulb@mellanox.com>
>> Reviewed-by: Roi Dayan <roid@mellanox.com>
>> Reviewed-by: Simon Horman <simon.horman@netronome.com>
>> ---
>>  lib/dpif-netlink.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  lib/netdev.c       |  32 ++++++++++
>>  2 files changed, 210 insertions(+), 1 deletion(-)
>>
>> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
>> index 02dd6c2..4e3bc8c 100644
>> --- a/lib/dpif-netlink.c
>> +++ b/lib/dpif-netlink.c
>> @@ -38,6 +38,7 @@
>>  #include "flow.h"
>>  #include "fat-rwlock.h"
>>  #include "netdev.h"
>> +#include "netdev-provider.h"
>>  #include "netdev-linux.h"
>>  #include "netdev-vport.h"
>>  #include "netlink-conntrack.h"
>> @@ -55,6 +56,7 @@
>>  #include "unaligned.h"
>>  #include "util.h"
>>  #include "openvswitch/vlog.h"
>> +#include "openvswitch/match.h"
>>
>>  VLOG_DEFINE_THIS_MODULE(dpif_netlink);
>>  #ifdef _WIN32
>> @@ -72,6 +74,8 @@ enum { MAX_PORTS = USHRT_MAX };
>>   * missing if we have old headers. */
>>  #define ETH_FLAG_LRO      (1 << 15)    /* LRO is enabled */
>>
>> +#define FLOW_DUMP_MAX_BATCH 50
>> +
>>  struct dpif_netlink_dp {
>>      /* Generic Netlink header. */
>>      uint8_t cmd;
>> @@ -1373,6 +1377,10 @@ struct dpif_netlink_flow_dump {
>>      struct dpif_flow_dump up;
>>      struct nl_dump nl_dump;
>>      atomic_int status;
>> +    struct netdev_flow_dump **netdev_dumps;
>> +    int netdev_dumps_num;                    /* Number of netdev_flow_dumps */
>> +    struct ovs_mutex netdev_lock;            /* Guards the following. */
>> +    int netdev_current_dump OVS_GUARDED;     /* Shared current dump */
>>  };
>>
>>  static struct dpif_netlink_flow_dump *
>> @@ -1381,6 +1389,26 @@ dpif_netlink_flow_dump_cast(struct dpif_flow_dump *dump)
>>      return CONTAINER_OF(dump, struct dpif_netlink_flow_dump, up);
>>  }
>>
>> +static void
>> +start_netdev_dump(const struct dpif *dpif_,
>> +                  struct dpif_netlink_flow_dump *dump)
>> +{
>> +    ovs_mutex_init(&dump->netdev_lock);
>> +
>> +    if (!netdev_flow_api_enabled) {
>> +        dump->netdev_dumps_num = 0;
>> +        dump->netdev_dumps = NULL;
>> +        return;
>> +    }
>> +
>> +    ovs_mutex_lock(&dump->netdev_lock);
>> +    dump->netdev_current_dump = 0;
>> +    dump->netdev_dumps
>> +        = netdev_ports_flow_dumps_create(DPIF_HMAP_KEY(dpif_),
>> +                                         &dump->netdev_dumps_num);
>> +    ovs_mutex_unlock(&dump->netdev_lock);
>> +}
>> +
>>  static struct dpif_flow_dump *
>>  dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
>>  {
>> @@ -1405,6 +1433,8 @@ dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
>>      atomic_init(&dump->status, 0);
>>      dump->up.terse = terse;
>>
>> +    start_netdev_dump(dpif_, dump);
>> +
>>      return &dump->up;
>>  }
>>
>> @@ -1415,6 +1445,16 @@ dpif_netlink_flow_dump_destroy(struct dpif_flow_dump *dump_)
>>      unsigned int nl_status = nl_dump_done(&dump->nl_dump);
>>      int dump_status;
>>
>> +    for (int i = 0; i < dump->netdev_dumps_num; i++) {
>> +        int err = netdev_flow_dump_destroy(dump->netdev_dumps[i]);
>> +        if (err != 0 && err != EOPNOTSUPP) {
>> +            VLOG_ERR("failed dumping netdev: %s", ovs_strerror(err));
>> +        }
>> +    }
>> +
>> +    free(dump->netdev_dumps);
>> +    ovs_mutex_destroy(&dump->netdev_lock);
>> +
>>      /* No other thread has access to 'dump' at this point. */
>>      atomic_read_relaxed(&dump->status, &dump_status);
>>      free(dump);
>> @@ -1428,6 +1468,13 @@ struct dpif_netlink_flow_dump_thread {
>>      struct dpif_flow_stats stats;
>>      struct ofpbuf nl_flows;     /* Always used to store flows. */
>>      struct ofpbuf *nl_actions;  /* Used if kernel does not supply actions. */
>> +    int netdev_dump_idx;        /* This thread current netdev dump index */
>> +    bool netdev_done;           /* If we are finished dumping netdevs */
>> +
>> +    /* (Key/Mask/Actions) Buffers for netdev dumping */
>> +    struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
>> +    struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
>> +    struct odputil_keybuf actbuf[FLOW_DUMP_MAX_BATCH];
>>  };
>>
>>  static struct dpif_netlink_flow_dump_thread *
>> @@ -1447,6 +1494,8 @@ dpif_netlink_flow_dump_thread_create(struct dpif_flow_dump *dump_)
>>      thread->dump = dump;
>>      ofpbuf_init(&thread->nl_flows, NL_DUMP_BUFSIZE);
>>      thread->nl_actions = NULL;
>> +    thread->netdev_dump_idx = 0;
>> +    thread->netdev_done = !(thread->netdev_dump_idx < dump->netdev_dumps_num);
>>
>>      return &thread->up;
>>  }
>> @@ -1484,6 +1533,96 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
>>      dpif_netlink_flow_get_stats(datapath_flow, &dpif_flow->stats);
>>  }
>>
>> +/* The design is such that all threads are working together on the first dump
>> + * to the last, in order (at first they all on dump 0).
>> + * When the first thread finds that the given dump is finished,
>> + * they all move to the next. If two or more threads find the same dump
>> + * is finished at the same time, the first one will advance the shared
>> + * netdev_current_dump and the others will catch up. */
>> +static void
>> +dpif_netlink_advance_netdev_dump(struct dpif_netlink_flow_dump_thread *thread)
>> +{
>> +    struct dpif_netlink_flow_dump *dump = thread->dump;
>> +
>> +    ovs_mutex_lock(&dump->netdev_lock);
>> +    /* if we haven't finished (dumped everything) */
>> +    if (dump->netdev_current_dump < dump->netdev_dumps_num) {
>> +        /* if we are the first to find that current dump is finished
>> +         * advance it. */
>> +        if (thread->netdev_dump_idx == dump->netdev_current_dump) {
>> +            thread->netdev_dump_idx = ++dump->netdev_current_dump;
>> +            /* did we just finish the last dump? done. */
>> +            if (dump->netdev_current_dump == dump->netdev_dumps_num) {
>> +                thread->netdev_done = true;
>> +            }
>> +        } else {
>> +            /* otherwise, we are behind, catch up */
>> +            thread->netdev_dump_idx = dump->netdev_current_dump;
>> +        }
>> +    } else {
>> +        /* some other thread finished */
>> +        thread->netdev_done = true;
>> +    }
>> +    ovs_mutex_unlock(&dump->netdev_lock);
>> +}
>> +
>> +static struct odp_support netdev_flow_support = {
>> +    .max_mpls_depth = SIZE_MAX,
>> +    .recirc = false,
>> +    .ct_state = false,
>> +    .ct_zone = false,
>> +    .ct_mark = false,
>> +    .ct_label = false,
>> +};
>
> Perhaps we should add a comment above this structure to state how it's
> only used for translating netdev flows into dpif flows, and current
> implementations of the netdev flow interface don't support these
> features so it's not relevant to serialize those fields into the
> nlattr-formatted buffers in the function below?

I think we don't actually need this. can just reset support to zero as 
we don't support any of those attributes for now.
We'll bring this back when we actually will support those.

>
>> +
>> +static int
>> +dpif_netlink_netdev_match_to_dpif_flow(struct match *match,
>> +                                       struct ofpbuf *key_buf,
>> +                                       struct ofpbuf *mask_buf,
>> +                                       struct nlattr *actions,
>> +                                       struct dpif_flow_stats *stats,
>> +                                       ovs_u128 *ufid,
>> +                                       struct dpif_flow *flow,
>> +                                       bool terse OVS_UNUSED)
>> +{
>> +
>> +    struct odp_flow_key_parms odp_parms = {
>> +        .flow = &match->flow,
>> +        .mask = &match->wc.masks,
>> +        .support = netdev_flow_support,
>> +    };
>> +    size_t offset;
>> +
>> +    memset(flow, 0, sizeof *flow);
>> +
>> +    /* Key */
>> +    offset = key_buf->size;
>> +    flow->key = ofpbuf_tail(key_buf);
>> +    odp_flow_key_from_flow(&odp_parms, key_buf);
>> +    flow->key_len = key_buf->size - offset;
>> +
>> +    /* Mask */
>> +    offset = mask_buf->size;
>> +    flow->mask = ofpbuf_tail(mask_buf);
>> +    odp_parms.key_buf = key_buf;
>> +    odp_flow_key_from_mask(&odp_parms, mask_buf);
>> +    flow->mask_len = mask_buf->size - offset;
>> +
>> +    /* Actions */
>> +    flow->actions = nl_attr_get(actions);
>> +    flow->actions_len = nl_attr_get_size(actions);
>> +
>> +    /* Stats */
>> +    memcpy(&flow->stats, stats, sizeof *stats);
>> +
>> +    /* UFID */
>> +    flow->ufid_present = true;
>> +    flow->ufid = *ufid;
>> +
>> +    flow->pmd_id = PMD_ID_NULL;
>> +    return 0;
>> +}
>> +
>>  static int
>>  dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
>>                              struct dpif_flow *flows, int max_flows)
>> @@ -1492,14 +1631,52 @@ dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
>>          = dpif_netlink_flow_dump_thread_cast(thread_);
>>      struct dpif_netlink_flow_dump *dump = thread->dump;
>>      struct dpif_netlink *dpif = dpif_netlink_cast(thread->up.dpif);
>> +    int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
>
> We might consider just overriding the value of max_flows just after
> initializing n_flows below instead of having another variable named
> "flow_limit"; "flow_limit" in other parts of the codebase implies
> something else.

ok. thanks.

>
>>      int n_flows;
>>
>>      ofpbuf_delete(thread->nl_actions);
>>      thread->nl_actions = NULL;
>>
>>      n_flows = 0;
>> +
>> +    while (!thread->netdev_done && n_flows < flow_limit) {
>> +        struct odputil_keybuf *maskbuf = &thread->maskbuf[n_flows];
>> +        struct odputil_keybuf *keybuf = &thread->keybuf[n_flows];
>> +        struct odputil_keybuf *actbuf = &thread->actbuf[n_flows];
>> +        struct ofpbuf key, mask, act;
>> +        struct dpif_flow *f = &flows[n_flows];
>> +        int cur = thread->netdev_dump_idx;
>> +        struct netdev_flow_dump *netdev_dump = dump->netdev_dumps[cur];
>> +        struct match match;
>> +        struct nlattr *actions;
>> +        struct dpif_flow_stats stats;
>> +        ovs_u128 ufid;
>> +        bool has_next;
>> +
>> +        ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
>> +        ofpbuf_use_stack(&act, actbuf, sizeof *actbuf);
>> +        ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
>> +        has_next = netdev_flow_dump_next(netdev_dump, &match,
>> +                                        &actions, &stats,
>> +                                        &ufid,
>> +                                        &thread->nl_flows,
>> +                                        &act);
>> +        if (has_next) {
>> +            dpif_netlink_netdev_match_to_dpif_flow(&match,
>> +                                                   &key, &mask,
>> +                                                   actions,
>> +                                                   &stats,
>> +                                                   &ufid,
>> +                                                   f,
>> +                                                   dump->up.terse);
>> +            n_flows++;
>> +        } else {
>> +            dpif_netlink_advance_netdev_dump(thread);
>> +        }
>> +    }
>> +
>>      while (!n_flows
>> -           || (n_flows < max_flows && thread->nl_flows.size)) {
>> +           || (n_flows < flow_limit && thread->nl_flows.size)) {
>>          struct dpif_netlink_flow datapath_flow;
>>          struct ofpbuf nl_flow;
>>          int error;
>> diff --git a/lib/netdev.c b/lib/netdev.c
>> index c3261fd..8093508 100644
>> --- a/lib/netdev.c
>> +++ b/lib/netdev.c
>> @@ -2239,6 +2239,38 @@ netdev_ports_flow_flush(const void *obj)
>>      }
>>  }
>>
>> +struct netdev_flow_dump **
>> +netdev_ports_flow_dumps_create(const void *obj, int *ports)
>> +{
>> +    struct port_to_netdev_data *data;
>> +    struct netdev_flow_dump **dumps;
>> +    int count = 0;
>> +    int i = 0;
>> +
>> +    HMAP_FOR_EACH(data, node, &port_to_netdev) {
>> +        if (data->obj == obj) {
>> +            count++;
>> +        }
>> +    }
>> +
>> +    dumps = count ? xzalloc(sizeof *dumps * count) : NULL;
>
> If this function took a 'struct netdev_ports' as per my earlier
> suggestion, the above several lines could be a call to hmap_count().

great. thanks.

>
>> +
>> +    HMAP_FOR_EACH(data, node, &port_to_netdev) {
>> +        if (data->obj == obj) {
>> +            int err = netdev_flow_dump_create(data->netdev, &dumps[i]);
>> +            if (err) {
>> +                continue;
>> +            }
>> +
>> +            dumps[i]->port = data->dpif_port.port_no;
>> +            i++;
>> +        }
>> +    }
>> +
>> +    *ports = i;
>> +    return dumps;
>> +}
>> +
>>  bool netdev_flow_api_enabled = false;
>>
>>  #ifdef __linux__
>> --
>> 2.7.4
>>
Joe Stringer April 19, 2017, 6:26 p.m. UTC | #3
On 19 April 2017 at 07:38, Roi Dayan <roid@mellanox.com> wrote:
>
>
> On 14/04/2017 03:51, Joe Stringer wrote:
>>
>> On 7 April 2017 at 06:12, Roi Dayan <roid@mellanox.com> wrote:
>>>
>>> From: Paul Blakey <paulb@mellanox.com>
>>>
>>> While dumping flows, dump flows that were offloaded to
>>> netdev and parse them back to dpif flow.
>>>
>>> Signed-off-by: Paul Blakey <paulb@mellanox.com>
>>> Reviewed-by: Roi Dayan <roid@mellanox.com>
>>> Reviewed-by: Simon Horman <simon.horman@netronome.com>
>>> ---
>>>  lib/dpif-netlink.c | 179
>>> ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>>  lib/netdev.c       |  32 ++++++++++
>>>  2 files changed, 210 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
>>> index 02dd6c2..4e3bc8c 100644
>>> --- a/lib/dpif-netlink.c
>>> +++ b/lib/dpif-netlink.c
>>> @@ -38,6 +38,7 @@
>>>  #include "flow.h"
>>>  #include "fat-rwlock.h"
>>>  #include "netdev.h"
>>> +#include "netdev-provider.h"
>>>  #include "netdev-linux.h"
>>>  #include "netdev-vport.h"
>>>  #include "netlink-conntrack.h"
>>> @@ -55,6 +56,7 @@
>>>  #include "unaligned.h"
>>>  #include "util.h"
>>>  #include "openvswitch/vlog.h"
>>> +#include "openvswitch/match.h"
>>>
>>>  VLOG_DEFINE_THIS_MODULE(dpif_netlink);
>>>  #ifdef _WIN32
>>> @@ -72,6 +74,8 @@ enum { MAX_PORTS = USHRT_MAX };
>>>   * missing if we have old headers. */
>>>  #define ETH_FLAG_LRO      (1 << 15)    /* LRO is enabled */
>>>
>>> +#define FLOW_DUMP_MAX_BATCH 50
>>> +
>>>  struct dpif_netlink_dp {
>>>      /* Generic Netlink header. */
>>>      uint8_t cmd;
>>> @@ -1373,6 +1377,10 @@ struct dpif_netlink_flow_dump {
>>>      struct dpif_flow_dump up;
>>>      struct nl_dump nl_dump;
>>>      atomic_int status;
>>> +    struct netdev_flow_dump **netdev_dumps;
>>> +    int netdev_dumps_num;                    /* Number of
>>> netdev_flow_dumps */
>>> +    struct ovs_mutex netdev_lock;            /* Guards the following. */
>>> +    int netdev_current_dump OVS_GUARDED;     /* Shared current dump */
>>>  };
>>>
>>>  static struct dpif_netlink_flow_dump *
>>> @@ -1381,6 +1389,26 @@ dpif_netlink_flow_dump_cast(struct dpif_flow_dump
>>> *dump)
>>>      return CONTAINER_OF(dump, struct dpif_netlink_flow_dump, up);
>>>  }
>>>
>>> +static void
>>> +start_netdev_dump(const struct dpif *dpif_,
>>> +                  struct dpif_netlink_flow_dump *dump)
>>> +{
>>> +    ovs_mutex_init(&dump->netdev_lock);
>>> +
>>> +    if (!netdev_flow_api_enabled) {
>>> +        dump->netdev_dumps_num = 0;
>>> +        dump->netdev_dumps = NULL;
>>> +        return;
>>> +    }
>>> +
>>> +    ovs_mutex_lock(&dump->netdev_lock);
>>> +    dump->netdev_current_dump = 0;
>>> +    dump->netdev_dumps
>>> +        = netdev_ports_flow_dumps_create(DPIF_HMAP_KEY(dpif_),
>>> +                                         &dump->netdev_dumps_num);
>>> +    ovs_mutex_unlock(&dump->netdev_lock);
>>> +}
>>> +
>>>  static struct dpif_flow_dump *
>>>  dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
>>>  {
>>> @@ -1405,6 +1433,8 @@ dpif_netlink_flow_dump_create(const struct dpif
>>> *dpif_, bool terse)
>>>      atomic_init(&dump->status, 0);
>>>      dump->up.terse = terse;
>>>
>>> +    start_netdev_dump(dpif_, dump);
>>> +
>>>      return &dump->up;
>>>  }
>>>
>>> @@ -1415,6 +1445,16 @@ dpif_netlink_flow_dump_destroy(struct
>>> dpif_flow_dump *dump_)
>>>      unsigned int nl_status = nl_dump_done(&dump->nl_dump);
>>>      int dump_status;
>>>
>>> +    for (int i = 0; i < dump->netdev_dumps_num; i++) {
>>> +        int err = netdev_flow_dump_destroy(dump->netdev_dumps[i]);
>>> +        if (err != 0 && err != EOPNOTSUPP) {
>>> +            VLOG_ERR("failed dumping netdev: %s", ovs_strerror(err));
>>> +        }
>>> +    }
>>> +
>>> +    free(dump->netdev_dumps);
>>> +    ovs_mutex_destroy(&dump->netdev_lock);
>>> +
>>>      /* No other thread has access to 'dump' at this point. */
>>>      atomic_read_relaxed(&dump->status, &dump_status);
>>>      free(dump);
>>> @@ -1428,6 +1468,13 @@ struct dpif_netlink_flow_dump_thread {
>>>      struct dpif_flow_stats stats;
>>>      struct ofpbuf nl_flows;     /* Always used to store flows. */
>>>      struct ofpbuf *nl_actions;  /* Used if kernel does not supply
>>> actions. */
>>> +    int netdev_dump_idx;        /* This thread current netdev dump index
>>> */
>>> +    bool netdev_done;           /* If we are finished dumping netdevs */
>>> +
>>> +    /* (Key/Mask/Actions) Buffers for netdev dumping */
>>> +    struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
>>> +    struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
>>> +    struct odputil_keybuf actbuf[FLOW_DUMP_MAX_BATCH];
>>>  };
>>>
>>>  static struct dpif_netlink_flow_dump_thread *
>>> @@ -1447,6 +1494,8 @@ dpif_netlink_flow_dump_thread_create(struct
>>> dpif_flow_dump *dump_)
>>>      thread->dump = dump;
>>>      ofpbuf_init(&thread->nl_flows, NL_DUMP_BUFSIZE);
>>>      thread->nl_actions = NULL;
>>> +    thread->netdev_dump_idx = 0;
>>> +    thread->netdev_done = !(thread->netdev_dump_idx <
>>> dump->netdev_dumps_num);
>>>
>>>      return &thread->up;
>>>  }
>>> @@ -1484,6 +1533,96 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif,
>>> struct dpif_flow *dpif_flow,
>>>      dpif_netlink_flow_get_stats(datapath_flow, &dpif_flow->stats);
>>>  }
>>>
>>> +/* The design is such that all threads are working together on the first
>>> dump
>>> + * to the last, in order (at first they all on dump 0).
>>> + * When the first thread finds that the given dump is finished,
>>> + * they all move to the next. If two or more threads find the same dump
>>> + * is finished at the same time, the first one will advance the shared
>>> + * netdev_current_dump and the others will catch up. */
>>> +static void
>>> +dpif_netlink_advance_netdev_dump(struct dpif_netlink_flow_dump_thread
>>> *thread)
>>> +{
>>> +    struct dpif_netlink_flow_dump *dump = thread->dump;
>>> +
>>> +    ovs_mutex_lock(&dump->netdev_lock);
>>> +    /* if we haven't finished (dumped everything) */
>>> +    if (dump->netdev_current_dump < dump->netdev_dumps_num) {
>>> +        /* if we are the first to find that current dump is finished
>>> +         * advance it. */
>>> +        if (thread->netdev_dump_idx == dump->netdev_current_dump) {
>>> +            thread->netdev_dump_idx = ++dump->netdev_current_dump;
>>> +            /* did we just finish the last dump? done. */
>>> +            if (dump->netdev_current_dump == dump->netdev_dumps_num) {
>>> +                thread->netdev_done = true;
>>> +            }
>>> +        } else {
>>> +            /* otherwise, we are behind, catch up */
>>> +            thread->netdev_dump_idx = dump->netdev_current_dump;
>>> +        }
>>> +    } else {
>>> +        /* some other thread finished */
>>> +        thread->netdev_done = true;
>>> +    }
>>> +    ovs_mutex_unlock(&dump->netdev_lock);
>>> +}
>>> +
>>> +static struct odp_support netdev_flow_support = {
>>> +    .max_mpls_depth = SIZE_MAX,
>>> +    .recirc = false,
>>> +    .ct_state = false,
>>> +    .ct_zone = false,
>>> +    .ct_mark = false,
>>> +    .ct_label = false,
>>> +};
>>
>>
>> Perhaps we should add a comment above this structure to state how it's
>> only used for translating netdev flows into dpif flows, and current
>> implementations of the netdev flow interface don't support these
>> features so it's not relevant to serialize those fields into the
>> nlattr-formatted buffers in the function below?
>
>
> I think we don't actually need this. can just reset support to zero as we
> don't support any of those attributes for now.
> We'll bring this back when we actually will support those.

OK sure, that works too.
diff mbox

Patch

diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index 02dd6c2..4e3bc8c 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -38,6 +38,7 @@ 
 #include "flow.h"
 #include "fat-rwlock.h"
 #include "netdev.h"
+#include "netdev-provider.h"
 #include "netdev-linux.h"
 #include "netdev-vport.h"
 #include "netlink-conntrack.h"
@@ -55,6 +56,7 @@ 
 #include "unaligned.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/match.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netlink);
 #ifdef _WIN32
@@ -72,6 +74,8 @@  enum { MAX_PORTS = USHRT_MAX };
  * missing if we have old headers. */
 #define ETH_FLAG_LRO      (1 << 15)    /* LRO is enabled */
 
+#define FLOW_DUMP_MAX_BATCH 50
+
 struct dpif_netlink_dp {
     /* Generic Netlink header. */
     uint8_t cmd;
@@ -1373,6 +1377,10 @@  struct dpif_netlink_flow_dump {
     struct dpif_flow_dump up;
     struct nl_dump nl_dump;
     atomic_int status;
+    struct netdev_flow_dump **netdev_dumps;
+    int netdev_dumps_num;                    /* Number of netdev_flow_dumps */
+    struct ovs_mutex netdev_lock;            /* Guards the following. */
+    int netdev_current_dump OVS_GUARDED;     /* Shared current dump */
 };
 
 static struct dpif_netlink_flow_dump *
@@ -1381,6 +1389,26 @@  dpif_netlink_flow_dump_cast(struct dpif_flow_dump *dump)
     return CONTAINER_OF(dump, struct dpif_netlink_flow_dump, up);
 }
 
+static void
+start_netdev_dump(const struct dpif *dpif_,
+                  struct dpif_netlink_flow_dump *dump)
+{
+    ovs_mutex_init(&dump->netdev_lock);
+
+    if (!netdev_flow_api_enabled) {
+        dump->netdev_dumps_num = 0;
+        dump->netdev_dumps = NULL;
+        return;
+    }
+
+    ovs_mutex_lock(&dump->netdev_lock);
+    dump->netdev_current_dump = 0;
+    dump->netdev_dumps
+        = netdev_ports_flow_dumps_create(DPIF_HMAP_KEY(dpif_),
+                                         &dump->netdev_dumps_num);
+    ovs_mutex_unlock(&dump->netdev_lock);
+}
+
 static struct dpif_flow_dump *
 dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
 {
@@ -1405,6 +1433,8 @@  dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
     atomic_init(&dump->status, 0);
     dump->up.terse = terse;
 
+    start_netdev_dump(dpif_, dump);
+
     return &dump->up;
 }
 
@@ -1415,6 +1445,16 @@  dpif_netlink_flow_dump_destroy(struct dpif_flow_dump *dump_)
     unsigned int nl_status = nl_dump_done(&dump->nl_dump);
     int dump_status;
 
+    for (int i = 0; i < dump->netdev_dumps_num; i++) {
+        int err = netdev_flow_dump_destroy(dump->netdev_dumps[i]);
+        if (err != 0 && err != EOPNOTSUPP) {
+            VLOG_ERR("failed dumping netdev: %s", ovs_strerror(err));
+        }
+    }
+
+    free(dump->netdev_dumps);
+    ovs_mutex_destroy(&dump->netdev_lock);
+
     /* No other thread has access to 'dump' at this point. */
     atomic_read_relaxed(&dump->status, &dump_status);
     free(dump);
@@ -1428,6 +1468,13 @@  struct dpif_netlink_flow_dump_thread {
     struct dpif_flow_stats stats;
     struct ofpbuf nl_flows;     /* Always used to store flows. */
     struct ofpbuf *nl_actions;  /* Used if kernel does not supply actions. */
+    int netdev_dump_idx;        /* This thread current netdev dump index */
+    bool netdev_done;           /* If we are finished dumping netdevs */
+
+    /* (Key/Mask/Actions) Buffers for netdev dumping */
+    struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
+    struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
+    struct odputil_keybuf actbuf[FLOW_DUMP_MAX_BATCH];
 };
 
 static struct dpif_netlink_flow_dump_thread *
@@ -1447,6 +1494,8 @@  dpif_netlink_flow_dump_thread_create(struct dpif_flow_dump *dump_)
     thread->dump = dump;
     ofpbuf_init(&thread->nl_flows, NL_DUMP_BUFSIZE);
     thread->nl_actions = NULL;
+    thread->netdev_dump_idx = 0;
+    thread->netdev_done = !(thread->netdev_dump_idx < dump->netdev_dumps_num);
 
     return &thread->up;
 }
@@ -1484,6 +1533,96 @@  dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
     dpif_netlink_flow_get_stats(datapath_flow, &dpif_flow->stats);
 }
 
+/* The design is such that all threads are working together on the first dump
+ * to the last, in order (at first they all on dump 0).
+ * When the first thread finds that the given dump is finished,
+ * they all move to the next. If two or more threads find the same dump
+ * is finished at the same time, the first one will advance the shared
+ * netdev_current_dump and the others will catch up. */
+static void
+dpif_netlink_advance_netdev_dump(struct dpif_netlink_flow_dump_thread *thread)
+{
+    struct dpif_netlink_flow_dump *dump = thread->dump;
+
+    ovs_mutex_lock(&dump->netdev_lock);
+    /* if we haven't finished (dumped everything) */
+    if (dump->netdev_current_dump < dump->netdev_dumps_num) {
+        /* if we are the first to find that current dump is finished
+         * advance it. */
+        if (thread->netdev_dump_idx == dump->netdev_current_dump) {
+            thread->netdev_dump_idx = ++dump->netdev_current_dump;
+            /* did we just finish the last dump? done. */
+            if (dump->netdev_current_dump == dump->netdev_dumps_num) {
+                thread->netdev_done = true;
+            }
+        } else {
+            /* otherwise, we are behind, catch up */
+            thread->netdev_dump_idx = dump->netdev_current_dump;
+        }
+    } else {
+        /* some other thread finished */
+        thread->netdev_done = true;
+    }
+    ovs_mutex_unlock(&dump->netdev_lock);
+}
+
+static struct odp_support netdev_flow_support = {
+    .max_mpls_depth = SIZE_MAX,
+    .recirc = false,
+    .ct_state = false,
+    .ct_zone = false,
+    .ct_mark = false,
+    .ct_label = false,
+};
+
+static int
+dpif_netlink_netdev_match_to_dpif_flow(struct match *match,
+                                       struct ofpbuf *key_buf,
+                                       struct ofpbuf *mask_buf,
+                                       struct nlattr *actions,
+                                       struct dpif_flow_stats *stats,
+                                       ovs_u128 *ufid,
+                                       struct dpif_flow *flow,
+                                       bool terse OVS_UNUSED)
+{
+
+    struct odp_flow_key_parms odp_parms = {
+        .flow = &match->flow,
+        .mask = &match->wc.masks,
+        .support = netdev_flow_support,
+    };
+    size_t offset;
+
+    memset(flow, 0, sizeof *flow);
+
+    /* Key */
+    offset = key_buf->size;
+    flow->key = ofpbuf_tail(key_buf);
+    odp_flow_key_from_flow(&odp_parms, key_buf);
+    flow->key_len = key_buf->size - offset;
+
+    /* Mask */
+    offset = mask_buf->size;
+    flow->mask = ofpbuf_tail(mask_buf);
+    odp_parms.key_buf = key_buf;
+    odp_flow_key_from_mask(&odp_parms, mask_buf);
+    flow->mask_len = mask_buf->size - offset;
+
+    /* Actions */
+    flow->actions = nl_attr_get(actions);
+    flow->actions_len = nl_attr_get_size(actions);
+
+    /* Stats */
+    memcpy(&flow->stats, stats, sizeof *stats);
+
+    /* UFID */
+    flow->ufid_present = true;
+    flow->ufid = *ufid;
+
+    flow->pmd_id = PMD_ID_NULL;
+    return 0;
+}
+
 static int
 dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
                             struct dpif_flow *flows, int max_flows)
@@ -1492,14 +1631,52 @@  dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
         = dpif_netlink_flow_dump_thread_cast(thread_);
     struct dpif_netlink_flow_dump *dump = thread->dump;
     struct dpif_netlink *dpif = dpif_netlink_cast(thread->up.dpif);
+    int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
     int n_flows;
 
     ofpbuf_delete(thread->nl_actions);
     thread->nl_actions = NULL;
 
     n_flows = 0;
+
+    while (!thread->netdev_done && n_flows < flow_limit) {
+        struct odputil_keybuf *maskbuf = &thread->maskbuf[n_flows];
+        struct odputil_keybuf *keybuf = &thread->keybuf[n_flows];
+        struct odputil_keybuf *actbuf = &thread->actbuf[n_flows];
+        struct ofpbuf key, mask, act;
+        struct dpif_flow *f = &flows[n_flows];
+        int cur = thread->netdev_dump_idx;
+        struct netdev_flow_dump *netdev_dump = dump->netdev_dumps[cur];
+        struct match match;
+        struct nlattr *actions;
+        struct dpif_flow_stats stats;
+        ovs_u128 ufid;
+        bool has_next;
+
+        ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
+        ofpbuf_use_stack(&act, actbuf, sizeof *actbuf);
+        ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
+        has_next = netdev_flow_dump_next(netdev_dump, &match,
+                                        &actions, &stats,
+                                        &ufid,
+                                        &thread->nl_flows,
+                                        &act);
+        if (has_next) {
+            dpif_netlink_netdev_match_to_dpif_flow(&match,
+                                                   &key, &mask,
+                                                   actions,
+                                                   &stats,
+                                                   &ufid,
+                                                   f,
+                                                   dump->up.terse);
+            n_flows++;
+        } else {
+            dpif_netlink_advance_netdev_dump(thread);
+        }
+    }
+
     while (!n_flows
-           || (n_flows < max_flows && thread->nl_flows.size)) {
+           || (n_flows < flow_limit && thread->nl_flows.size)) {
         struct dpif_netlink_flow datapath_flow;
         struct ofpbuf nl_flow;
         int error;
diff --git a/lib/netdev.c b/lib/netdev.c
index c3261fd..8093508 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -2239,6 +2239,38 @@  netdev_ports_flow_flush(const void *obj)
     }
 }
 
+struct netdev_flow_dump **
+netdev_ports_flow_dumps_create(const void *obj, int *ports)
+{
+    struct port_to_netdev_data *data;
+    struct netdev_flow_dump **dumps;
+    int count = 0;
+    int i = 0;
+
+    HMAP_FOR_EACH(data, node, &port_to_netdev) {
+        if (data->obj == obj) {
+            count++;
+        }
+    }
+
+    dumps = count ? xzalloc(sizeof *dumps * count) : NULL;
+
+    HMAP_FOR_EACH(data, node, &port_to_netdev) {
+        if (data->obj == obj) {
+            int err = netdev_flow_dump_create(data->netdev, &dumps[i]);
+            if (err) {
+                continue;
+            }
+
+            dumps[i]->port = data->dpif_port.port_no;
+            i++;
+        }
+    }
+
+    *ports = i;
+    return dumps;
+}
+
 bool netdev_flow_api_enabled = false;
 
 #ifdef __linux__