diff mbox

[ovs-dev,6/7] ofproto: Implement learning limit.

Message ID 20170225025801.86791-7-diproiettod@vmware.com
State Deferred
Headers show

Commit Message

Daniele Di Proietto Feb. 25, 2017, 2:58 a.m. UTC
With this commit we honor the newly introduced limit to the learn
action.

When learning a new rule (with the limit set), the rule will hold a
reference to a counter.  The ukey that learned the rule will also keep
the same reference,  so the counter will be decremented when both the
ukey and the original rule have been deleted.

This means that there's a small window between the learn flow expiry and
the next revalidation in which new flows are not learned because OVS
thinks that we would exceed the limit.  I think this is better that the
alternative, because if we allow learning in that interval we're not
strictly enforcing the limit, because we still have the datapath flows
that are passing traffic for expired rules.

There's a small corner case when we have to slowpath the ukey.  This
happens when:
* The learned rule has expired (or has been deleted).
* The ukey that learned the rule is still in the datapath.
* No packets hit the datapath flow recently.
In this case we cannot relearn the rule (because there are no new
packets), and the actions might depend on the learn execution, so the
only option is to slowpath the ukey.  I don't think this has big
performance implications since it's done only for ukey with no traffic.

Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>
---
 lib/odp-util.h                     |  6 ++++--
 ofproto/ofproto-dpif-upcall.c      | 25 ++++++++++++++++++-----
 ofproto/ofproto-dpif-xlate-cache.c |  6 +++++-
 ofproto/ofproto-dpif-xlate-cache.h |  1 +
 ofproto/ofproto-dpif-xlate.c       | 33 +++++++++++++++++++++++++++---
 ofproto/ofproto-dpif-xlate.h       |  3 +++
 ofproto/ofproto-dpif.c             |  9 +++++---
 ofproto/ofproto-dpif.h             |  2 +-
 ofproto/ofproto-provider.h         | 17 +++++++++++++--
 ofproto/ofproto.c                  | 42 ++++++++++++++++++++++++++++++++------
 10 files changed, 121 insertions(+), 23 deletions(-)

Comments

Ben Pfaff March 7, 2017, 6:39 p.m. UTC | #1
On Fri, Feb 24, 2017 at 06:58:00PM -0800, Daniele Di Proietto wrote:
> With this commit we honor the newly introduced limit to the learn
> action.
> 
> When learning a new rule (with the limit set), the rule will hold a
> reference to a counter.  The ukey that learned the rule will also keep
> the same reference,  so the counter will be decremented when both the
> ukey and the original rule have been deleted.
> 
> This means that there's a small window between the learn flow expiry and
> the next revalidation in which new flows are not learned because OVS
> thinks that we would exceed the limit.  I think this is better that the
> alternative, because if we allow learning in that interval we're not
> strictly enforcing the limit, because we still have the datapath flows
> that are passing traffic for expired rules.
> 
> There's a small corner case when we have to slowpath the ukey.  This
> happens when:
> * The learned rule has expired (or has been deleted).
> * The ukey that learned the rule is still in the datapath.
> * No packets hit the datapath flow recently.
> In this case we cannot relearn the rule (because there are no new
> packets), and the actions might depend on the learn execution, so the
> only option is to slowpath the ukey.  I don't think this has big
> performance implications since it's done only for ukey with no traffic.
> 
> Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>

You mentioned face-to-face that a change I suggested earlier in the
series would simplify this one.  If it's OK, then I'll wait to review
this patch and patch 7 until you've managed to do the simplification.

Thanks,

Ben.
Daniele Di Proietto March 8, 2017, 7:25 p.m. UTC | #2
On 07/03/2017 10:39, "Ben Pfaff" <blp@ovn.org> wrote:

>On Fri, Feb 24, 2017 at 06:58:00PM -0800, Daniele Di Proietto wrote:
>> With this commit we honor the newly introduced limit to the learn
>> action.
>> 
>> When learning a new rule (with the limit set), the rule will hold a
>> reference to a counter.  The ukey that learned the rule will also keep
>> the same reference,  so the counter will be decremented when both the
>> ukey and the original rule have been deleted.
>> 
>> This means that there's a small window between the learn flow expiry and
>> the next revalidation in which new flows are not learned because OVS
>> thinks that we would exceed the limit.  I think this is better that the
>> alternative, because if we allow learning in that interval we're not
>> strictly enforcing the limit, because we still have the datapath flows
>> that are passing traffic for expired rules.
>> 
>> There's a small corner case when we have to slowpath the ukey.  This
>> happens when:
>> * The learned rule has expired (or has been deleted).
>> * The ukey that learned the rule is still in the datapath.
>> * No packets hit the datapath flow recently.
>> In this case we cannot relearn the rule (because there are no new
>> packets), and the actions might depend on the learn execution, so the
>> only option is to slowpath the ukey.  I don't think this has big
>> performance implications since it's done only for ukey with no traffic.
>> 
>> Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>
>
>You mentioned face-to-face that a change I suggested earlier in the
>series would simplify this one.  If it's OK, then I'll wait to review
>this patch and patch 7 until you've managed to do the simplification.
>
>Thanks,
>
>Ben.

Thanks a lot for looking at this Ben,

I've sent a v2 here:

https://mail.openvswitch.org/pipermail/ovs-dev/2017-March/329524.html
diff mbox

Patch

diff --git a/lib/odp-util.h b/lib/odp-util.h
index 42011bccd..1f10d981d 100644
--- a/lib/odp-util.h
+++ b/lib/odp-util.h
@@ -41,11 +41,13 @@  struct pkt_metadata;
     SPR(SLOW_BFD,        "bfd",        "Consists of BFD packets")       \
     SPR(SLOW_LACP,       "lacp",       "Consists of LACP packets")      \
     SPR(SLOW_STP,        "stp",        "Consists of STP packets")       \
-    SPR(SLOW_LLDP,       "lldp",       "Consists of LLDP packets")    \
+    SPR(SLOW_LLDP,       "lldp",       "Consists of LLDP packets")      \
     SPR(SLOW_CONTROLLER, "controller",                                  \
         "Sends \"packet-in\" messages to the OpenFlow controller")      \
     SPR(SLOW_ACTION,     "action",                                      \
-        "Uses action(s) not supported by datapath")
+        "Uses action(s) not supported by datapath")                     \
+    SPR(SLOW_MAY_LEARN,      "learn",                                   \
+        "New packets may or may not learn new flows")                   \
 
 /* Indexes for slow-path reasons.  Client code uses "enum slow_path_reason"
  * values instead of these, these are just a way to construct those. */
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 35b5b7533..60b24e77d 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -252,9 +252,13 @@  enum ukey_state {
 /* Each udpif_key can hold reference to global objects in an ofproto.  These
  * references are stored here. */
 struct ukey_refs {
-    struct recirc_refs recircs;  /* Action recirc IDs with references held. */
+    /* Action recirc IDs with references held. */
+    struct recirc_refs recircs;
+    /* References to counters used for learn action limits. */
+    struct cookie_counter_refs learn_refs;
 };
-#define UKEY_REFS_INIT {RECIRC_REFS_EMPTY_INITIALIZER}
+#define UKEY_REFS_INIT \
+    {RECIRC_REFS_EMPTY_INITIALIZER, COOKIE_COUNTER_REFS_INIT}
 
 /* 'udpif_key's are responsible for tracking the little bit of state udpif
  * needs to do flow expiration which can't be pulled directly from the
@@ -1495,9 +1499,13 @@  ukey_create__(const struct nlattr *key, size_t key_len,
 
     ukey->key_recirc_id = key_recirc_id;
     recirc_refs_init(&ukey->global_refs.recircs);
+    cookie_counter_refs_init(&ukey->global_refs.learn_refs);
     if (xout) {
-        /* Take ownership of the action recirc id references. */
+        /* Take ownership of the action recirc id and learn counters
+         * references. */
         recirc_refs_swap(&ukey->global_refs.recircs, &xout->recircs);
+        cookie_counter_refs_swap(&ukey->global_refs.learn_refs,
+                                 &xout->learn_refs);
     }
 
     return ukey;
@@ -1789,6 +1797,7 @@  ukey_delete__(struct udpif_key *ukey)
             recirc_free_id(ukey->key_recirc_id);
         }
         recirc_refs_unref(&ukey->global_refs.recircs);
+        cookie_counter_refs_unref(&ukey->global_refs.learn_refs);
         xlate_cache_delete(ukey->xcache);
         ofpbuf_delete(ovsrcu_get(struct ofpbuf *, &ukey->actions));
         ovs_mutex_destroy(&ukey->mutex);
@@ -1982,6 +1991,8 @@  revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey,
         result = UKEY_MODIFY;
         /* Transfer recirc action ID references to the caller. */
         recirc_refs_swap(&global_refs->recircs, &xoutp->recircs);
+        /* Transfer learn action counter references to the caller. */
+        cookie_counter_refs_swap(&global_refs->learn_refs, &xoutp->learn_refs);
         goto exit;
     }
 
@@ -2226,10 +2237,14 @@  reval_op_init(struct ukey_op *op, enum reval_result result,
         delete_op_init(udpif, op, ukey);
         transition_ukey(ukey, UKEY_EVICTING);
     } else if (result == UKEY_MODIFY) {
-        /* Store the new recircs. */
+        /* Store the new references (to recirc ids and learn counters). */
         recirc_refs_swap(&ukey->global_refs.recircs, &global_refs->recircs);
-        /* Release old recircs. */
+        cookie_counter_refs_swap(&ukey->global_refs.learn_refs,
+                                 &global_refs->learn_refs);
+        /* Release old. */
         recirc_refs_unref(&global_refs->recircs);
+        cookie_counter_refs_unref(&global_refs->learn_refs);
+
         /* ukey->key_recirc_id remains, as the key is the same as before. */
 
         ukey_set_actions(ukey, odp_actions);
diff --git a/ofproto/ofproto-dpif-xlate-cache.c b/ofproto/ofproto-dpif-xlate-cache.c
index d6341c084..cbbeb3a24 100644
--- a/ofproto/ofproto-dpif-xlate-cache.c
+++ b/ofproto/ofproto-dpif-xlate-cache.c
@@ -122,8 +122,12 @@  xlate_push_stats_entry(struct xc_entry *entry,
                             stats->n_packets, stats->n_bytes);
         break;
     case XC_LEARN: {
+        struct cookie_counter_ref *ref;
         enum ofperr error;
-        error = ofproto_flow_mod_learn(entry->learn.ofm, true);
+        /* We already hold a reference to the cookie_counter in the ukey.
+         * Therefore we can safely ignore 'ref'. */
+        error = ofproto_flow_mod_learn(entry->learn.ofm, true,
+                                       entry->learn.limit, &ref);
         if (error) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
             VLOG_WARN_RL(&rl, "xcache LEARN action execution failed.");
diff --git a/ofproto/ofproto-dpif-xlate-cache.h b/ofproto/ofproto-dpif-xlate-cache.h
index a15adbe64..13f7cbc91 100644
--- a/ofproto/ofproto-dpif-xlate-cache.h
+++ b/ofproto/ofproto-dpif-xlate-cache.h
@@ -93,6 +93,7 @@  struct xc_entry {
         } bond;
         struct {
             struct ofproto_flow_mod *ofm;
+            unsigned limit;
         } learn;
         struct {
             struct ofproto_dpif *ofproto;
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index b934f3c2b..6385f400f 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -4354,6 +4354,7 @@  xlate_learn_action(struct xlate_ctx *ctx, const struct ofpact_learn *learn)
     learn_mask(learn, ctx->wc);
 
     if (ctx->xin->xcache || ctx->xin->allow_side_effects) {
+        struct cookie_counter_ref *count_ref = NULL;
         uint64_t ofpacts_stub[1024 / 8];
         struct ofputil_flow_mod fm;
         struct ofproto_flow_mod ofm__, *ofm;
@@ -4392,21 +4393,45 @@  xlate_learn_action(struct xlate_ctx *ctx, const struct ofpact_learn *learn)
             ds_destroy(&s);
         }
         error = ofproto_dpif_flow_mod_init_for_learn(ctx->xbridge->ofproto,
-                                                     &fm, ofm);
+                                                     &fm, ofm, &count_ref);
         ofpbuf_uninit(&ofpacts);
 
         if (!error) {
+            bool success = true;
+
             if (ctx->xin->allow_side_effects) {
-                error = ofproto_flow_mod_learn(ofm, ctx->xin->xcache != NULL);
+                error = ofproto_flow_mod_learn(ofm, ctx->xin->xcache != NULL,
+                                               learn->limit, &count_ref);
+            } else if (learn->limit) {
+                if (!count_ref) {
+                    ctx->xout->slow |= SLOW_MAY_LEARN;
+                }
             }
 
-            if (ctx->xin->xcache) {
+            if (count_ref) {
+                cookie_counter_refs_add(&ctx->xout->learn_refs, count_ref);
+            } else if (learn->limit) {
+                success = false;
+            }
+
+            if (learn->flags & NX_LEARN_F_WRITE_RESULT) {
+                nxm_reg_load(&learn->result_dst, success ? 1 : 0,
+                             &ctx->xin->flow, ctx->wc);
+                xlate_report_subfield(ctx, &learn->result_dst);
+            }
+
+            if (success && ctx->xin->xcache) {
                 struct xc_entry *entry;
 
                 entry = xlate_cache_add_entry(ctx->xin->xcache, XC_LEARN);
                 entry->learn.ofm = ofm;
+                entry->learn.limit = learn->limit;
                 ofm = NULL;
             }
+
+            if (OVS_UNLIKELY(ctx->xin->trace && !success)) {
+                xlate_report(ctx, OFT_DETAIL, "Limit exceeded, learn failed");
+            }
         }
 
         if (ctx->xin->xcache) {
@@ -5498,6 +5523,7 @@  xlate_out_uninit(struct xlate_out *xout)
 {
     if (xout) {
         recirc_refs_unref(&xout->recircs);
+        cookie_counter_refs_unref(&xout->learn_refs);
     }
 }
 
@@ -5707,6 +5733,7 @@  xlate_actions(struct xlate_in *xin, struct xlate_out *xout)
     *xout = (struct xlate_out) {
         .slow = 0,
         .recircs = RECIRC_REFS_EMPTY_INITIALIZER,
+        .learn_refs = COOKIE_COUNTER_REFS_INIT,
     };
 
     struct xlate_cfg *xcfg = ovsrcu_get(struct xlate_cfg *, &xcfgp);
diff --git a/ofproto/ofproto-dpif-xlate.h b/ofproto/ofproto-dpif-xlate.h
index 3986f2612..99f9da3ec 100644
--- a/ofproto/ofproto-dpif-xlate.h
+++ b/ofproto/ofproto-dpif-xlate.h
@@ -42,6 +42,9 @@  struct xlate_out {
 
     struct recirc_refs recircs; /* Recirc action IDs on which references are
                                  * held. */
+    struct cookie_counter_refs learn_refs; /* Cookie counters for learn
+                                              limiting on which references are
+                                              held. */
 };
 
 struct xlate_in {
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 89c7b7ffc..f2136eb73 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -206,14 +206,17 @@  static struct shash init_ofp_ports = SHASH_INITIALIZER(&init_ofp_ports);
 
 /* Initialize 'ofm' for a learn action.  If the rule already existed, reference
  * to that rule is taken, otherwise a new rule is created.  'ofm' keeps the
- * rule reference in both cases. */
+ * rule reference in both cases.  If the rule already existed and it had a
+ * reference to a cookie counter,  the reference will be returned in
+ * '*learned_ref'. */
 enum ofperr
 ofproto_dpif_flow_mod_init_for_learn(struct ofproto_dpif *ofproto,
                                      const struct ofputil_flow_mod *fm,
-                                     struct ofproto_flow_mod *ofm)
+                                     struct ofproto_flow_mod *ofm,
+                                     struct cookie_counter_ref **learn_ref)
 {
     /* This will not take the global 'ofproto_mutex'. */
-    return ofproto_flow_mod_init_for_learn(&ofproto->up, fm, ofm);
+    return ofproto_flow_mod_init_for_learn(&ofproto->up, fm, ofm, learn_ref);
 }
 
 /* Appends 'am' to the queue of asynchronous messages to be sent to the
diff --git a/ofproto/ofproto-dpif.h b/ofproto/ofproto-dpif.h
index 533fadf66..7ff30cd0f 100644
--- a/ofproto/ofproto-dpif.h
+++ b/ofproto/ofproto-dpif.h
@@ -299,7 +299,7 @@  int ofproto_dpif_send_packet(const struct ofport_dpif *, bool oam,
                              struct dp_packet *);
 enum ofperr ofproto_dpif_flow_mod_init_for_learn(
     struct ofproto_dpif *, const struct ofputil_flow_mod *,
-    struct ofproto_flow_mod *);
+    struct ofproto_flow_mod *, struct cookie_counter_ref **);
 
 struct ofport_dpif *ofp_port_to_ofport(const struct ofproto_dpif *,
                                        ofp_port_t);
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index 7d3e929f2..1e0a0d26b 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -35,6 +35,7 @@ 
 
 #include "cfm.h"
 #include "classifier.h"
+#include "cookie-counters.h"
 #include "guarded-list.h"
 #include "heap.h"
 #include "hindex.h"
@@ -132,6 +133,11 @@  struct ofproto {
     /* Variable length mf_field mapping. Stores all configured variable length
      * meta-flow fields (struct mf_field) in a switch. */
     struct vl_mff_map vl_mff_map;
+
+    /* Contains the counters used to implement limits for the learn action.
+     * We keep them in a separate map because some provider wants to hold
+     * references to the counters after the rule has been deleted. */
+    struct cookie_counter_map learned_counters;
 };
 
 void ofproto_init_tables(struct ofproto *, int n_tables);
@@ -423,6 +429,10 @@  struct rule {
 
     /* Must hold 'mutex' for both read/write, 'ofproto_mutex' not needed. */
     long long int modified OVS_GUARDED; /* Time of last modification. */
+
+    /* If the rule has been learned with a limit, this holds a reference
+     * to a counter. */
+    struct cookie_counter_ref *learned_count_ref OVS_GUARDED;
 };
 
 void ofproto_rule_ref(struct rule *);
@@ -1927,9 +1937,12 @@  enum ofperr ofproto_flow_mod(struct ofproto *, const struct ofputil_flow_mod *)
     OVS_EXCLUDED(ofproto_mutex);
 enum ofperr ofproto_flow_mod_init_for_learn(struct ofproto *,
                                             const struct ofputil_flow_mod *,
-                                            struct ofproto_flow_mod *)
+                                            struct ofproto_flow_mod *,
+                                            struct cookie_counter_ref **)
     OVS_EXCLUDED(ofproto_mutex);
-enum ofperr ofproto_flow_mod_learn(struct ofproto_flow_mod *, bool keep_ref)
+enum ofperr ofproto_flow_mod_learn(struct ofproto_flow_mod *, bool keep_ref,
+                                   unsigned limit,
+                                   struct cookie_counter_ref **)
     OVS_EXCLUDED(ofproto_mutex);
 enum ofperr ofproto_flow_mod_learn_refresh(struct ofproto_flow_mod *ofm);
 enum ofperr ofproto_flow_mod_learn_start(struct ofproto_flow_mod *ofm)
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index ef4b1d980..b821d9c76 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -28,6 +28,7 @@ 
 #include "classifier.h"
 #include "connectivity.h"
 #include "connmgr.h"
+#include "cookie-counters.h"
 #include "coverage.h"
 #include "dp-packet.h"
 #include "hash.h"
@@ -520,6 +521,7 @@  ofproto_create(const char *datapath_name, const char *datapath_type,
     ofproto->connmgr = connmgr_create(ofproto, datapath_name, datapath_name);
     ofproto->min_mtu = INT_MAX;
     cmap_init(&ofproto->groups);
+    cookie_counter_table_init(&ofproto->learned_counters);
     ovs_mutex_unlock(&ofproto_mutex);
     ofproto->ogf.types = 0xf;
     ofproto->ogf.capabilities = OFPGFC_CHAINING | OFPGFC_SELECT_LIVENESS |
@@ -1609,6 +1611,8 @@  ofproto_destroy__(struct ofproto *ofproto)
     ovs_assert(hmap_is_empty(&ofproto->learned_cookies));
     hmap_destroy(&ofproto->learned_cookies);
 
+    cookie_counter_table_destroy(&ofproto->learned_counters);
+
     ofproto->ofproto_class->dealloc(ofproto);
 }
 
@@ -2815,6 +2819,8 @@  ofproto_rule_destroy__(struct rule *rule)
     cls_rule_destroy(CONST_CAST(struct cls_rule *, &rule->cr));
     rule_actions_destroy(rule_get_actions(rule));
     ovs_mutex_destroy(&rule->mutex);
+    cookie_counter_destroy_ref(rule->learned_count_ref);
+    rule->learned_count_ref = NULL;
     rule->ofproto->ofproto_class->rule_dealloc(rule);
 }
 
@@ -4910,15 +4916,19 @@  ofproto_rule_create(struct ofproto *ofproto, struct cls_rule *cr,
 
 /* Initialize 'ofm' for a learn action.  If the rule already existed, reference
  * to that rule is taken, otherwise a new rule is created.  'ofm' keeps the
- * rule reference in both.  This does not take the global 'ofproto_mutex'. */
+ * rule reference in both.  This does not take the global 'ofproto_mutex'.
+ * If the rule already existed and it had a reference to a cookie counter,
+ * the reference will be returned in '*learned_ref'. */
 enum ofperr
 ofproto_flow_mod_init_for_learn(struct ofproto *ofproto,
                                 const struct ofputil_flow_mod *fm,
-                                struct ofproto_flow_mod *ofm)
+                                struct ofproto_flow_mod *ofm,
+                                struct cookie_counter_ref **learn_ref)
     OVS_EXCLUDED(ofproto_mutex)
 {
     enum ofperr error;
 
+    *learn_ref = NULL;
     /* Reject flow mods that do not look like they were generated by a learn
      * action. */
     if (fm->command != OFPFC_MODIFY_STRICT || fm->table_id == OFPTT_ALL
@@ -4949,12 +4959,15 @@  ofproto_flow_mod_init_for_learn(struct ofproto *ofproto,
                              actions->ofpacts, actions->ofpacts_len)) {
             /* Rule already exists and need not change, except for the modified
              * timestamp.  Get a reference to the existing rule. */
+            *learn_ref = rule->learned_count_ref;
             ovs_mutex_unlock(&rule->mutex);
             if (!ofproto_rule_try_ref(rule)) {
+                *learn_ref = NULL;
                 rule = NULL; /* Pretend it did not exist. */
             }
         } else {
             ovs_mutex_unlock(&rule->mutex);
+            *learn_ref = NULL;
             rule = NULL;
         }
     }
@@ -4962,6 +4975,7 @@  ofproto_flow_mod_init_for_learn(struct ofproto *ofproto,
     /* Initialize ofproto_flow_mod for future use. */
     error = ofproto_flow_mod_init(ofproto, ofm, fm, rule);
     if (error) {
+        *learn_ref = NULL;
         ofproto_rule_unref(rule);
         return error;
     }
@@ -5062,10 +5076,16 @@  ofproto_flow_mod_learn_finish(struct ofproto_flow_mod *ofm,
  * 'keep_ref' is true, then a reference to the current rule is held, otherwise
  * it is released and 'ofm->temp_rule' is set to NULL.
  *
+ * If 'limit' != 0, insertion will fail if there are more than 'limit' rules
+ * in the same table with the same cookie.  If insertion succeeds,
+ * '*learned_ref' will contain a reference to the counter used to track the
+ * limit.
+ *
  * Caller needs to be the exclusive owner of 'ofm' as it is being manipulated
  * during the call. */
 enum ofperr
-ofproto_flow_mod_learn(struct ofproto_flow_mod *ofm, bool keep_ref)
+ofproto_flow_mod_learn(struct ofproto_flow_mod *ofm, bool keep_ref,
+                       unsigned limit, struct cookie_counter_ref **learned_ref)
     OVS_EXCLUDED(ofproto_mutex)
 {
     enum ofperr error = ofproto_flow_mod_learn_refresh(ofm);
@@ -5073,11 +5093,21 @@  ofproto_flow_mod_learn(struct ofproto_flow_mod *ofm, bool keep_ref)
 
     /* Do we need to insert the rule? */
     if (!error && rule->state == RULE_INITIALIZED) {
+        struct cookie_counter_ref *ref = NULL;
         ovs_mutex_lock(&ofproto_mutex);
         ofm->version = rule->ofproto->tables_version + 1;
-        error = ofproto_flow_mod_learn_start(ofm);
-        if (!error) {
-            ofproto_flow_mod_learn_finish(ofm, NULL);
+        if (limit) {
+            ref = cookie_counter_new_ref(&rule->ofproto->learned_counters,
+                                         rule->flow_cookie, rule->table_id,
+                                         limit);
+            rule->learned_count_ref = ref;
+            *learned_ref = ref;
+        }
+        if (!limit || ref) {
+            error = ofproto_flow_mod_learn_start(ofm);
+            if (!error) {
+                ofproto_flow_mod_learn_finish(ofm, NULL);
+            }
         }
         ovs_mutex_unlock(&ofproto_mutex);
     }