@@ -3052,7 +3052,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
}
static void
-dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
+dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
+ enum dpif_op_skip_type skip_flag OVS_UNUSED)
{
size_t i;
@@ -2271,7 +2271,8 @@ dpif_netlink_operate_chunks(struct dpif_netlink *dpif, struct dpif_op **ops,
}
static void
-dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
+dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops,
+ enum dpif_op_skip_type skip_flag)
{
struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
struct dpif_op *new_ops[OPERATE_MAX_OPS];
@@ -2279,7 +2280,7 @@ dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
int i = 0;
int err = 0;
- if (netdev_is_flow_api_enabled()) {
+ if (skip_flag != DPIF_OP_SKIP_OFFLOAD && netdev_is_flow_api_enabled()) {
while (n_ops > 0) {
count = 0;
@@ -2288,6 +2289,10 @@ dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
err = try_send_to_netdev(dpif, op);
if (err && err != EEXIST) {
+ if (skip_flag == DPIF_OP_SKIP_DP) {
+ op->error = err;
+ return;
+ }
new_ops[count++] = op;
} else {
op->error = err;
@@ -2298,8 +2303,11 @@ dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
dpif_netlink_operate_chunks(dpif, new_ops, count);
}
- } else {
+ } else if (skip_flag != DPIF_OP_SKIP_DP) {
dpif_netlink_operate_chunks(dpif, ops, n_ops);
+ } else {
+ VLOG_ERR("%s: Invalid skip_flag: %d while flow api is disabled\n",
+ __func__, skip_flag);
}
}
@@ -296,12 +296,13 @@ struct dpif_class {
int (*flow_dump_next)(struct dpif_flow_dump_thread *thread,
struct dpif_flow *flows, int max_flows);
-
/* Executes each of the 'n_ops' operations in 'ops' on 'dpif', in the order
* in which they are specified, placing each operation's results in the
* "output" members documented in comments and the 'error' member of each
- * dpif_op. */
- void (*operate)(struct dpif *dpif, struct dpif_op **ops, size_t n_ops);
+ * dpif_op. The skip_flag argument tells the provider if 'ops' should be
+ * offloaded to a netdev or to the kernel datapath or to both. */
+ void (*operate)(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
+ enum dpif_op_skip_type skip_flag);
/* Enables or disables receiving packets with dpif_recv() for 'dpif'.
* Turning packet receive off and then back on is allowed to change Netlink
@@ -49,6 +49,7 @@
#include "valgrind.h"
#include "openvswitch/ofp-errors.h"
#include "openvswitch/vlog.h"
+#include "lib/netdev-provider.h"
VLOG_DEFINE_THIS_MODULE(dpif);
@@ -1010,7 +1011,7 @@ dpif_flow_get(struct dpif *dpif,
op.flow_get.flow->key_len = key_len;
opp = &op;
- dpif_operate(dpif, &opp, 1);
+ dpif_operate(dpif, &opp, 1, DPIF_OP_SKIP_NONE);
return op.error;
}
@@ -1040,7 +1041,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
op.flow_put.stats = stats;
opp = &op;
- dpif_operate(dpif, &opp, 1);
+ dpif_operate(dpif, &opp, 1, DPIF_OP_SKIP_NONE);
return op.error;
}
@@ -1063,7 +1064,7 @@ dpif_flow_del(struct dpif *dpif,
op.flow_del.terse = false;
opp = &op;
- dpif_operate(dpif, &opp, 1);
+ dpif_operate(dpif, &opp, 1, DPIF_OP_SKIP_NONE);
return op.error;
}
@@ -1320,7 +1321,7 @@ dpif_execute(struct dpif *dpif, struct dpif_execute *execute)
op.execute = *execute;
opp = &op;
- dpif_operate(dpif, &opp, 1);
+ dpif_operate(dpif, &opp, 1, DPIF_OP_SKIP_NONE);
return op.error;
} else {
@@ -1331,9 +1332,12 @@ dpif_execute(struct dpif *dpif, struct dpif_execute *execute)
/* Executes each of the 'n_ops' operations in 'ops' on 'dpif', in the order in
* which they are specified. Places each operation's results in the "output"
* members documented in comments, and 0 in the 'error' member on success or a
- * positive errno on failure. */
+ * positive errno on failure. The skip_flag argument tells the provider if
+ * 'ops' should be offloaded to a netdev or to the kernel datapath or to both.
+ */
void
-dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
+dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
+ enum dpif_op_skip_type skip_flag)
{
while (n_ops > 0) {
size_t chunk;
@@ -1355,7 +1359,7 @@ dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
* handle itself, without help. */
size_t i;
- dpif->dpif_class->operate(dpif, ops, chunk);
+ dpif->dpif_class->operate(dpif, ops, chunk, skip_flag);
for (i = 0; i < chunk; i++) {
struct dpif_op *op = ops[i];
@@ -1652,7 +1656,7 @@ dpif_queue_to_priority(const struct dpif *dpif, uint32_t queue_id,
log_operation(dpif, "queue_to_priority", error);
return error;
}
-
+
void
dpif_init(struct dpif *dpif, const struct dpif_class *dpif_class,
const char *name,
@@ -614,6 +614,23 @@ enum dpif_op_type {
DPIF_OP_FLOW_GET,
};
+/* skip_flag argument types to (*operate) interface */
+enum dpif_op_skip_type {
+ DPIF_OP_SKIP_NONE = 1, /* If Offload enabled:
+ * Offload first; if it
+ * fails, go to datapath.
+ * If Offload disabled: Go to datapath.
+ */
+
+ DPIF_OP_SKIP_OFFLOAD, /* Go to datapath only; do not offload even if
+ * Offload enabled
+ */
+
+ DPIF_OP_SKIP_DP /* Do not go to datapath, even if Offload
+ * enabled and fail to offload.
+ */
+};
+
/* Add or modify a flow.
*
* The flow is specified by the Netlink attributes with types OVS_KEY_ATTR_* in
@@ -768,7 +785,8 @@ struct dpif_op {
};
};
-void dpif_operate(struct dpif *, struct dpif_op **ops, size_t n_ops);
+void dpif_operate(struct dpif *, struct dpif_op **ops, size_t n_ops,
+ enum dpif_op_skip_type skip_flag);
/* Upcalls. */
@@ -38,6 +38,8 @@ struct netdev_tnl_build_header_params;
/* Offload-capable (HW) netdev information */
struct netdev_hw_info {
bool oor; /* Out of Offload Resources ? */
+ int offload_count;
+ int pending_count;
};
/* A network device (e.g. an Ethernet device).
@@ -85,7 +87,6 @@ struct netdev {
int n_rxq;
struct shash_node *node; /* Pointer to element in global map. */
struct ovs_list saved_flags_list; /* Contains "struct netdev_saved_flags". */
-
struct netdev_hw_info hw_info; /* offload-capable netdev info */
};
@@ -22,6 +22,7 @@
#include "connmgr.h"
#include "coverage.h"
#include "cmap.h"
+#include "lib/dpif-provider.h"
#include "dpif.h"
#include "openvswitch/dynamic-string.h"
#include "fail-open.h"
@@ -42,7 +43,6 @@
#include "tunnel.h"
#include "unixctl.h"
#include "openvswitch/vlog.h"
-#include "lib/dpif-provider.h"
#include "lib/netdev-provider.h"
#define MAX_QUEUE_LENGTH 512
@@ -399,6 +399,18 @@ static int upcall_receive(struct upcall *, const struct dpif_backer *,
const ovs_u128 *ufid, const unsigned pmd_id);
static void upcall_uninit(struct upcall *);
+static void udpif_flow_rebalance_prepare(struct udpif *udpif,
+ struct udpif_key ***
+ active_flows,
+ int *num_active_flows);
+static void udpif_flow_rebalance(struct udpif *udpif,
+ struct udpif_key **active_flows,
+ int num_active_flows);
+static int udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
+ enum dpif_op_skip_type skip_flag);
+static int udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
+ enum dpif_op_skip_type skip_flag);
+
static upcall_callback upcall_cb;
static dp_purge_callback dp_purge_cb;
@@ -862,6 +874,25 @@ free_dupcall:
return n_upcalls;
}
+static void
+udpif_run_flow_rebalance(struct udpif *udpif)
+{
+ struct udpif_key **active_flows = NULL;
+ static long long int time = 0;
+ static long long int now = 0;
+ int num_active_flows;
+
+ /* Don't rebalance if OFFL_REBAL_INTVL_MSEC have not elapsed */
+ now = time_msec();
+ if (now < time + OFFL_REBAL_INTVL_MSEC) {
+ return;
+ }
+ time = now;
+
+ udpif_flow_rebalance_prepare(udpif, &active_flows, &num_active_flows);
+ udpif_flow_rebalance(udpif, active_flows, num_active_flows);
+}
+
static void *
udpif_revalidator(void *arg)
{
@@ -936,6 +967,7 @@ udpif_revalidator(void *arg)
dpif_flow_dump_destroy(udpif->dump);
seq_change(udpif->dump_seq);
+ udpif_run_flow_rebalance(udpif);
duration = MAX(time_msec() - start_time, 1);
udpif->dump_duration = duration;
@@ -1582,7 +1614,7 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
for (i = 0; i < n_ops; i++) {
opsp[n_opsp++] = &ops[i].dop;
}
- dpif_operate(udpif->dpif, opsp, n_opsp);
+ dpif_operate(udpif->dpif, opsp, n_opsp, DPIF_OP_SKIP_NONE);
for (i = 0; i < n_ops; i++) {
struct udpif_key *ukey = ops[i].ukey;
@@ -1674,7 +1706,7 @@ ukey_create__(const struct nlattr *key, size_t key_len,
ukey->state = UKEY_CREATED;
ukey->state_thread = ovsthread_id_self();
ukey->state_where = OVS_SOURCE_LOCATOR;
- ukey->created = time_msec();
+ ukey->created = ukey->flow_time = time_msec();
memset(&ukey->stats, 0, sizeof ukey->stats);
ukey->stats.used = used;
ukey->xcache = NULL;
@@ -2333,7 +2365,7 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
for (i = 0; i < n_ops; i++) {
opsp[i] = &ops[i].dop;
}
- dpif_operate(udpif->dpif, opsp, n_ops);
+ dpif_operate(udpif->dpif, opsp, n_ops, DPIF_OP_SKIP_NONE);
for (i = 0; i < n_ops; i++) {
struct ukey_op *op = &ops[i];
@@ -2564,6 +2596,16 @@ udpif_flow_time_delta(struct udpif *udpif, struct udpif_key *ukey)
return (udpif->dpif->current_ms - ukey->flow_time) / 1000;
}
+/*
+ * Save backlog packet count while switching modes
+ * between offloaded and kernel datapaths.
+ */
+static void
+udpif_set_ukey_backlog_packets(struct udpif_key *ukey)
+{
+ ukey->flow_backlog_packets = ukey->flow_packets;
+}
+
/* Gather pps-rate for the given dpif_flow and save it in its ukey */
static void
udpif_update_flow_pps(struct udpif *udpif, struct udpif_key *ukey,
@@ -2641,6 +2683,7 @@ revalidate(struct revalidator *revalidator)
kill_them_all = n_dp_flows > flow_limit * 2;
max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
+ udpif->dpif->current_ms = time_msec();
for (f = flows; f < &flows[n_dumped]; f++) {
long long int used = f->stats.used;
struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
@@ -3016,3 +3059,370 @@ upcall_unixctl_purge(struct unixctl_conn *conn, int argc OVS_UNUSED,
}
unixctl_command_reply(conn, "");
}
+
+/*
+ * Walk umaps and build an array of pointers to ukeys. That
+ * serves as the input to udpif_flow_rebalance() routine.
+ */
+static void
+udpif_flow_rebalance_prepare(struct udpif *udpif, struct udpif_key
+ ***active_flows, int *num_active_flows)
+{
+ struct udpif_key **f = NULL;
+ size_t n = 0;
+ size_t i;
+ size_t j;
+
+ if (!active_flows || !num_active_flows) {
+ return;
+ }
+
+ for (i = 0; i < N_UMAPS; i++) {
+ struct umap *umap = &udpif->ukeys[i];
+ n += cmap_count(&umap->cmap);
+ }
+
+ f = xzalloc(n * sizeof(*f));
+
+ for (i = 0, j = 0; i < N_UMAPS; i++) {
+ struct udpif_key *ukey;
+ struct umap *umap = &udpif->ukeys[i];
+
+ CMAP_FOR_EACH (ukey, cmap_node, &umap->cmap) {
+ f[j++] = ukey;
+ if (j >= n) {
+ goto done;
+ }
+ }
+ }
+
+done:
+ *active_flows = f;
+ *num_active_flows = n;
+}
+
+/* Flows are sorted in the following order:
+ * netdev name, flow state (offloaded/kernel path), flow_pps_rate.
+ */
+static int
+flow_compare_rebalance(const void *elem1, const void *elem2)
+{
+ const struct udpif_key *f1 =
+ *(struct udpif_key **)elem1;
+ const struct udpif_key *f2 =
+ *(struct udpif_key **)elem2;
+ struct netdev *netdev1;
+ struct netdev *netdev2;
+ int rv;
+
+ netdev1 = f1->tunnel_netdev ? f1->tunnel_netdev : f1->in_netdev;
+ netdev2 = f2->tunnel_netdev ? f2->tunnel_netdev : f2->in_netdev;
+
+ rv = strcmp(netdev1->name, netdev2->name);
+ if (rv) {
+ return (rv);
+ }
+
+ if (f1->offloaded != f2->offloaded) {
+ return (f2->offloaded - f1->offloaded);
+ }
+
+ float diff = (f1->offloaded == true) ?
+ f1->flow_pps_rate - f2->flow_pps_rate :
+ f2->flow_pps_rate - f1->flow_pps_rate;
+
+ return (diff < 0) ? -1 : 1;
+}
+
+/* Insert flows from pending array during rebalancing */
+static int
+rebalance_insert_pending(struct udpif *udpif, struct udpif_key **pending_flows,
+ int pending_count, int insert_count,
+ float rate_threshold)
+{
+ int i, err = 0;
+ struct udpif_key *flow;
+ int count = 0;
+
+ for (i = 0; i < pending_count; i++) {
+ flow = pending_flows[i];
+
+ /* Stop offloading pending flows if the insert count is
+ * reached and the flow rate is less than the threshold
+ */
+ if ((count >= insert_count) &&
+ (flow->flow_pps_rate < rate_threshold)) {
+ break;
+ }
+
+ /* Offload the flow to netdev */
+ err = udpif_flow_program(udpif, flow, DPIF_OP_SKIP_DP);
+
+ if (err == ENOSPC) {
+ /* Stop if we are out of resources */
+ break;
+ }
+
+ if (err) {
+ continue;
+ }
+
+ /* Offload succeeded; delete it from the kernel datapath */
+ udpif_flow_unprogram(udpif, flow, DPIF_OP_SKIP_OFFLOAD);
+
+ /* Change the state of the flow, adjust dpif counters */
+ flow->offloaded = true;
+
+ udpif_set_ukey_backlog_packets(flow);
+ count++;
+ }
+
+ return count;
+}
+
+/* Remove flows from offloaded array during rebalancing */
+static void
+rebalance_remove_offloaded(struct udpif *udpif,
+ struct udpif_key **offloaded_flows,
+ int offload_count)
+{
+ int i;
+ struct udpif_key *flow;
+
+ for (i = 0; i < offload_count; i++) {
+ flow = offloaded_flows[i];
+
+ /* Remove offloaded flow from netdev */
+ udpif_flow_unprogram(udpif, flow, DPIF_OP_SKIP_DP);
+
+ /* Install the flow into kernel path */
+ udpif_flow_program(udpif, flow, DPIF_OP_SKIP_OFFLOAD);
+ flow->offloaded = false;
+
+ udpif_set_ukey_backlog_packets(flow);
+ }
+}
+
+/*
+ * Rebalance offloaded flows on a netdev that's in OOR state.
+ *
+ * The rebalancing is done in two phases. In the first phase, we check if
+ * the pending flows can be offloaded (if some resources became available
+ * in the meantime) by trying to offload each pending flow. If all pending
+ * flows get successfully offloaded, the OOR state is cleared on the netdev
+ * and there's nothing to rebalance.
+ *
+ * If some of the pending flows could not be offloaded, i.e, we still see
+ * the OOR error, then we move to the second phase of rebalancing. In this
+ * phase, the rebalancer compares pps-rate of an offloaded flow with the
+ * least pps-rate with that of a pending flow with the highest pps-rate from
+ * their respective sorted arrays. If pps-rate of the offloaded flow is less
+ * than the pps-rate of the pending flow, then it deletes the offloaded flow
+ * from the HW/netdev and adds it to kernel datapath and then offloads pending
+ * to HW/netdev. This process is repeated for every pair of offloaded and
+ * pending flows in the ordered list. The process stops when we encounter an
+ * offloaded flow that has a higher pps-rate than the corresponding pending
+ * flow. The entire rebalancing process is repeated in the next iteration.
+ */
+static bool
+rebalance_device(struct udpif *udpif, struct udpif_key **offloaded_flows,
+ int offload_count, struct udpif_key **pending_flows,
+ int pending_count)
+{
+
+ /* Phase 1 */
+ int num_inserted = rebalance_insert_pending(udpif, pending_flows,
+ pending_count, pending_count,
+ 0.0);
+ if (num_inserted) {
+ VLOG_DBG("Offload rebalance: Phase1: inserted %d pending flows\n",
+ num_inserted);
+ }
+
+ /* Adjust pending array */
+ pending_flows = &pending_flows[num_inserted];
+ pending_count -= num_inserted;
+
+ if (!pending_count) {
+ /*
+ * Successfully offloaded all pending flows. The device
+ * is no longer in OOR state; done rebalancing this device.
+ */
+ return false;
+ }
+
+ /*
+ * Phase 2; determine how many offloaded flows to churn.
+ */
+ int churn_count = 0;
+ while ((churn_count < offload_count) &&
+ (churn_count < pending_count)) {
+ if (pending_flows[churn_count]->flow_pps_rate <=
+ offloaded_flows[churn_count]->flow_pps_rate)
+ break;
+ churn_count++;
+ }
+
+ if (churn_count) {
+ VLOG_DBG("Offload rebalance: Phase2: removed %d offloaded flows\n",
+ churn_count);
+ }
+
+ /* Bail early if nothing to churn */
+ if (!churn_count) {
+ return true;
+ }
+
+ /* Remove offloaded flows */
+ rebalance_remove_offloaded(udpif, offloaded_flows, churn_count);
+
+ /* Adjust offloaded array */
+ offloaded_flows = &offloaded_flows[churn_count];
+ offload_count -= churn_count;
+
+ /* Replace offloaded flows with pending flows */
+ num_inserted = rebalance_insert_pending(udpif, pending_flows,
+ pending_count, churn_count,
+ offload_count ?
+ offloaded_flows[0]->flow_pps_rate :
+ 0.0);
+ if (num_inserted) {
+ VLOG_DBG("Offload rebalance: Phase2: inserted %d pending flows\n",
+ num_inserted);
+ }
+
+ return true;
+}
+
+/*
+ * Given all ukeys that are active in the datapath, rebalance
+ * offloaded flows on HW netdevs that are in OOR state.
+ */
+static void
+udpif_flow_rebalance(struct udpif *udpif,
+ struct udpif_key **active_flows,
+ int num_active_flows)
+{
+ struct udpif_key **sort_flows = NULL;
+ int total_offload_count = 0;
+ int total_pending_count = 0;
+ int total_flow_count = 0;
+ int oor_netdev_count = 0;
+ struct udpif_key *ukey;
+ struct netdev *netdev;
+ int offload_index = 0;
+ int pending_index;
+ bool oor;
+ int i;
+
+ if (!num_active_flows) {
+ return;
+ }
+
+ sort_flows = xzalloc(sizeof(*sort_flows) * num_active_flows);
+
+ /* Populate sort_flows[] initially with flows that
+ * have an 'OOR' device as their input or output port
+ */
+ for (i = 0; i < num_active_flows; i++) {
+ ukey = active_flows[i];
+
+ /* Both input and output netdevs must be available for the flow */
+ if (!ukey->in_netdev || !ukey->out_netdev) {
+ continue;
+ }
+
+ /* Get input netdev for the flow */
+ netdev = ukey->tunnel_netdev ? ukey->tunnel_netdev : ukey->in_netdev;
+
+ /* Is the in-netdev for this flow in OOR state ? */
+ if (netdev->hw_info.oor) {
+ /* Add the flow to sort_flows[] */
+ sort_flows[total_flow_count++] = active_flows[i];
+
+ if (ukey->offloaded) {
+ total_offload_count++;
+ if (netdev->hw_info.offload_count++ == 0) {
+ oor_netdev_count++;
+ }
+ } else {
+ total_pending_count++;
+ netdev->hw_info.pending_count++;
+ }
+ }
+ }
+
+ qsort(sort_flows, total_flow_count, sizeof(struct udpif_key *),
+ flow_compare_rebalance);
+
+ /*
+ * We now have a count of offloaded and pending flows on each of the
+ * netdevs that are in OOR state. Now rebalance netdevs that are in OOR
+ * state.
+ */
+ while (oor_netdev_count) {
+ netdev = sort_flows[offload_index]->tunnel_netdev ?
+ sort_flows[offload_index]->tunnel_netdev :
+ sort_flows[offload_index]->in_netdev;
+ if (!netdev->hw_info.oor) {
+ continue;
+ }
+
+ VLOG_DBG("Offload rebalance: netdev: %s is OOR\n", netdev->name);
+ pending_index = offload_index + netdev->hw_info.offload_count;
+ oor = rebalance_device(udpif,
+ &sort_flows[offload_index],
+ netdev->hw_info.offload_count,
+ &sort_flows[pending_index],
+ netdev->hw_info.pending_count);
+
+ netdev->hw_info.oor = oor;
+ offload_index = pending_index + netdev->hw_info.pending_count;
+ netdev->hw_info.offload_count = netdev->hw_info.pending_count = 0;
+ oor_netdev_count--;
+ }
+
+ free(sort_flows);
+ free(active_flows);
+}
+
+static void
+ukey_to_ukey_op(struct udpif *udpif, struct ukey_op *uop,
+ struct udpif_key *ukey, bool put)
+{
+ if (put) {
+ put_op_init(uop, ukey, DPIF_FP_CREATE);
+ } else {
+ delete_op_init(udpif, uop, ukey);
+ }
+}
+
+#define FLOW_PGM_NUM_OPS 1
+
+static int
+udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
+ enum dpif_op_skip_type skip_flag)
+{
+ struct dpif_op *opsp;
+ struct ukey_op uop;
+
+ opsp = &uop.dop;
+ ukey_to_ukey_op(udpif, &uop, ukey, true);
+ dpif_operate(udpif->dpif, &opsp, FLOW_PGM_NUM_OPS, skip_flag);
+
+ return opsp->error;
+}
+
+static int
+udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
+ enum dpif_op_skip_type skip_flag)
+{
+ struct dpif_op *opsp;
+ struct ukey_op uop;
+
+ opsp = &uop.dop;
+ ukey_to_ukey_op(udpif, &uop, ukey, false);
+ dpif_operate(udpif->dpif, &opsp, FLOW_PGM_NUM_OPS, skip_flag);
+
+ return opsp->error;
+}