@@ -39,6 +39,7 @@
#include "flow.h"
#include "fat-rwlock.h"
#include "netdev.h"
+#include "netdev-provider.h"
#include "netdev-linux.h"
#include "netdev-vport.h"
#include "netlink-conntrack.h"
@@ -71,6 +72,8 @@ enum { MAX_PORTS = USHRT_MAX };
* missing if we have old headers. */
#define ETH_FLAG_LRO (1 << 15) /* LRO is enabled */
+#define FLOW_DUMP_MAX_BATCH 50
+
struct dpif_netlink_dp {
/* Generic Netlink header. */
uint8_t cmd;
@@ -1445,6 +1448,10 @@ struct dpif_netlink_flow_dump {
struct dpif_flow_dump up;
struct nl_dump nl_dump;
atomic_int status;
+ struct netdev_flow_dump **netdev_dumps;
+ int netdev_dumps_num; /* Number of netdev_flow_dumps */
+ struct ovs_mutex netdev_lock; /* Guards the following. */
+ int netdev_current_dump OVS_GUARDED; /* Shared current dump */
};
static struct dpif_netlink_flow_dump *
@@ -1453,6 +1460,26 @@ dpif_netlink_flow_dump_cast(struct dpif_flow_dump *dump)
return CONTAINER_OF(dump, struct dpif_netlink_flow_dump, up);
}
+static void
+start_netdev_dump(const struct dpif *dpif_,
+ struct dpif_netlink_flow_dump *dump)
+{
+ ovs_mutex_init(&dump->netdev_lock);
+
+ if (!netdev_is_flow_api_enabled()) {
+ dump->netdev_dumps_num = 0;
+ dump->netdev_dumps = NULL;
+ return;
+ }
+
+ ovs_mutex_lock(&dump->netdev_lock);
+ dump->netdev_current_dump = 0;
+ dump->netdev_dumps
+ = netdev_ports_flow_dump_create(DPIF_HMAP_KEY(dpif_),
+ &dump->netdev_dumps_num);
+ ovs_mutex_unlock(&dump->netdev_lock);
+}
+
static struct dpif_flow_dump *
dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
{
@@ -1477,6 +1504,8 @@ dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
atomic_init(&dump->status, 0);
dump->up.terse = terse;
+ start_netdev_dump(dpif_, dump);
+
return &dump->up;
}
@@ -1487,6 +1516,17 @@ dpif_netlink_flow_dump_destroy(struct dpif_flow_dump *dump_)
unsigned int nl_status = nl_dump_done(&dump->nl_dump);
int dump_status;
+ for (int i = 0; i < dump->netdev_dumps_num; i++) {
+ int err = netdev_flow_dump_destroy(dump->netdev_dumps[i]);
+
+ if (err != 0 && err != EOPNOTSUPP) {
+ VLOG_ERR("failed dumping netdev: %s", ovs_strerror(err));
+ }
+ }
+
+ free(dump->netdev_dumps);
+ ovs_mutex_destroy(&dump->netdev_lock);
+
/* No other thread has access to 'dump' at this point. */
atomic_read_relaxed(&dump->status, &dump_status);
free(dump);
@@ -1500,6 +1540,13 @@ struct dpif_netlink_flow_dump_thread {
struct dpif_flow_stats stats;
struct ofpbuf nl_flows; /* Always used to store flows. */
struct ofpbuf *nl_actions; /* Used if kernel does not supply actions. */
+ int netdev_dump_idx; /* This thread current netdev dump index */
+ bool netdev_done; /* If we are finished dumping netdevs */
+
+ /* (Key/Mask/Actions) Buffers for netdev dumping */
+ struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
+ struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
+ struct odputil_keybuf actbuf[FLOW_DUMP_MAX_BATCH];
};
static struct dpif_netlink_flow_dump_thread *
@@ -1519,6 +1566,8 @@ dpif_netlink_flow_dump_thread_create(struct dpif_flow_dump *dump_)
thread->dump = dump;
ofpbuf_init(&thread->nl_flows, NL_DUMP_BUFSIZE);
thread->nl_actions = NULL;
+ thread->netdev_dump_idx = 0;
+ thread->netdev_done = !(thread->netdev_dump_idx < dump->netdev_dumps_num);
return &thread->up;
}
@@ -1556,6 +1605,89 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
dpif_netlink_flow_get_stats(datapath_flow, &dpif_flow->stats);
}
+/* The design is such that all threads are working together on the first dump
+ * to the last, in order (at first they all on dump 0).
+ * When the first thread finds that the given dump is finished,
+ * they all move to the next. If two or more threads find the same dump
+ * is finished at the same time, the first one will advance the shared
+ * netdev_current_dump and the others will catch up. */
+static void
+dpif_netlink_advance_netdev_dump(struct dpif_netlink_flow_dump_thread *thread)
+{
+ struct dpif_netlink_flow_dump *dump = thread->dump;
+
+ ovs_mutex_lock(&dump->netdev_lock);
+ /* if we haven't finished (dumped everything) */
+ if (dump->netdev_current_dump < dump->netdev_dumps_num) {
+ /* if we are the first to find that current dump is finished
+ * advance it. */
+ if (thread->netdev_dump_idx == dump->netdev_current_dump) {
+ thread->netdev_dump_idx = ++dump->netdev_current_dump;
+ /* did we just finish the last dump? done. */
+ if (dump->netdev_current_dump == dump->netdev_dumps_num) {
+ thread->netdev_done = true;
+ }
+ } else {
+ /* otherwise, we are behind, catch up */
+ thread->netdev_dump_idx = dump->netdev_current_dump;
+ }
+ } else {
+ /* some other thread finished */
+ thread->netdev_done = true;
+ }
+ ovs_mutex_unlock(&dump->netdev_lock);
+}
+
+static int
+dpif_netlink_netdev_match_to_dpif_flow(struct match *match,
+ struct ofpbuf *key_buf,
+ struct ofpbuf *mask_buf,
+ struct nlattr *actions,
+ struct dpif_flow_stats *stats,
+ ovs_u128 *ufid,
+ struct dpif_flow *flow,
+ bool terse OVS_UNUSED)
+{
+
+ struct odp_flow_key_parms odp_parms = {
+ .flow = &match->flow,
+ .mask = &match->wc.masks,
+ .support = {
+ .max_vlan_headers = 1,
+ },
+ };
+ size_t offset;
+
+ memset(flow, 0, sizeof *flow);
+
+ /* Key */
+ offset = key_buf->size;
+ flow->key = ofpbuf_tail(key_buf);
+ odp_flow_key_from_flow(&odp_parms, key_buf);
+ flow->key_len = key_buf->size - offset;
+
+ /* Mask */
+ offset = mask_buf->size;
+ flow->mask = ofpbuf_tail(mask_buf);
+ odp_parms.key_buf = key_buf;
+ odp_flow_key_from_mask(&odp_parms, mask_buf);
+ flow->mask_len = mask_buf->size - offset;
+
+ /* Actions */
+ flow->actions = nl_attr_get(actions);
+ flow->actions_len = nl_attr_get_size(actions);
+
+ /* Stats */
+ memcpy(&flow->stats, stats, sizeof *stats);
+
+ /* UFID */
+ flow->ufid_present = true;
+ flow->ufid = *ufid;
+
+ flow->pmd_id = PMD_ID_NULL;
+ return 0;
+}
+
static int
dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
struct dpif_flow *flows, int max_flows)
@@ -1570,6 +1702,44 @@ dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread *thread_,
thread->nl_actions = NULL;
n_flows = 0;
+ max_flows = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
+
+ while (!thread->netdev_done && n_flows < max_flows) {
+ struct odputil_keybuf *maskbuf = &thread->maskbuf[n_flows];
+ struct odputil_keybuf *keybuf = &thread->keybuf[n_flows];
+ struct odputil_keybuf *actbuf = &thread->actbuf[n_flows];
+ struct ofpbuf key, mask, act;
+ struct dpif_flow *f = &flows[n_flows];
+ int cur = thread->netdev_dump_idx;
+ struct netdev_flow_dump *netdev_dump = dump->netdev_dumps[cur];
+ struct match match;
+ struct nlattr *actions;
+ struct dpif_flow_stats stats;
+ ovs_u128 ufid;
+ bool has_next;
+
+ ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
+ ofpbuf_use_stack(&act, actbuf, sizeof *actbuf);
+ ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
+ has_next = netdev_flow_dump_next(netdev_dump, &match,
+ &actions, &stats,
+ &ufid,
+ &thread->nl_flows,
+ &act);
+ if (has_next) {
+ dpif_netlink_netdev_match_to_dpif_flow(&match,
+ &key, &mask,
+ actions,
+ &stats,
+ &ufid,
+ f,
+ dump->up.terse);
+ n_flows++;
+ } else {
+ dpif_netlink_advance_netdev_dump(thread);
+ }
+ }
+
while (!n_flows
|| (n_flows < max_flows && thread->nl_flows.size)) {
struct dpif_netlink_flow datapath_flow;
@@ -2273,6 +2273,39 @@ netdev_ports_flow_flush(const void *obj)
ovs_mutex_unlock(&netdev_hmap_mutex);
}
+struct netdev_flow_dump **
+netdev_ports_flow_dump_create(const void *obj, int *ports)
+{
+ struct port_to_netdev_data *data;
+ struct netdev_flow_dump **dumps;
+ int count = 0;
+ int i = 0;
+
+ ovs_mutex_lock(&netdev_hmap_mutex);
+ HMAP_FOR_EACH(data, node, &port_to_netdev) {
+ if (data->obj == obj) {
+ count++;
+ }
+ }
+
+ dumps = count ? xzalloc(sizeof *dumps * count) : NULL;
+
+ HMAP_FOR_EACH(data, node, &port_to_netdev) {
+ if (data->obj == obj) {
+ if (netdev_flow_dump_create(data->netdev, &dumps[i])) {
+ continue;
+ }
+
+ dumps[i]->port = data->dpif_port.port_no;
+ i++;
+ }
+ }
+ ovs_mutex_unlock(&netdev_hmap_mutex);
+
+ *ports = i;
+ return dumps;
+}
+
#ifdef __linux__
void
netdev_set_flow_api_enabled(const struct smap *ovs_other_config)
@@ -187,6 +187,8 @@ int netdev_ports_insert(struct netdev *, const void *obj, struct dpif_port *);
struct netdev *netdev_ports_get(odp_port_t port, const void *obj);
int netdev_ports_remove(odp_port_t port, const void *obj);
odp_port_t netdev_ifindex_to_odp_port(int ifindex);
+struct netdev_flow_dump **netdev_ports_flow_dump_create(const void *obj,
+ int *ports);
void netdev_ports_flow_flush(const void *obj);
/* native tunnel APIs */