diff mbox series

[ovs-dev,v2,5/8] controller: Update MAC binding timestamp

Message ID 20230731133538.3027989-6-amusil@redhat.com
State Accepted
Headers show
Series Add MAC binding aging timestamp refresh mechanism | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Ales Musil July 31, 2023, 1:35 p.m. UTC
To achieve that add thread that will handle
statistics requests and delegate the processing
to defined functions. This allows the thread to
be flexible enough, so it could be extended in future
if needed.

At the same time connected the thread with the MAC
cache I-P node to have the timestamp updates. The
updates should happen once per dump_period
(3/4 of the aging threshold) per chassis only if
the MAC binding is actively used.

Reported-at: https://bugzilla.redhat.com/2189924
Signed-off-by: Ales Musil <amusil@redhat.com>
---
v2: Rebase on top of current main.
    Address comments from Mark:
    - Fix wrong test comment.
    - Allow the stats node request to be more generic.
    - Use the precomputed dump_period.
---
 controller/automake.mk      |   4 +-
 controller/mac_cache.c      |  93 ++++++++
 controller/mac_cache.h      |   7 +
 controller/ovn-controller.c |  13 +-
 controller/statctrl.c       | 435 ++++++++++++++++++++++++++++++++++++
 controller/statctrl.h       |  28 +++
 tests/ovn.at                |  23 +-
 7 files changed, 596 insertions(+), 7 deletions(-)
 create mode 100644 controller/statctrl.c
 create mode 100644 controller/statctrl.h
diff mbox series

Patch

diff --git a/controller/automake.mk b/controller/automake.mk
index 562290359..0dbbd5d26 100644
--- a/controller/automake.mk
+++ b/controller/automake.mk
@@ -45,7 +45,9 @@  controller_ovn_controller_SOURCES = \
 	controller/mirror.h \
 	controller/mirror.c \
 	controller/mac_cache.h \
-	controller/mac_cache.c
+	controller/mac_cache.c \
+	controller/statctrl.h \
+	controller/statctrl.c
 
 controller_ovn_controller_LDADD = lib/libovn.la $(OVS_LIBDIR)/libopenvswitch.la
 man_MANS += controller/ovn-controller.8
diff --git a/controller/mac_cache.c b/controller/mac_cache.c
index a1a8354aa..c89de6b24 100644
--- a/controller/mac_cache.c
+++ b/controller/mac_cache.c
@@ -45,6 +45,8 @@  mac_cache_threshold_get_value_ms(const struct sbrec_datapath_binding *dp,
 static void
 mac_cache_threshold_remove(struct hmap *thresholds,
                            struct mac_cache_threshold *threshold);
+static void
+mac_cache_update_req_delay(struct hmap *thresholds, uint64_t *req_delay);
 
 bool
 mac_cache_threshold_add(struct mac_cache_data *data,
@@ -167,6 +169,84 @@  mac_cache_mac_bindings_clear(struct mac_cache_data *data)
     }
 }
 
+struct mac_cache_stats {
+    struct ovs_list list_node;
+
+    int64_t idle_age_ms;
+
+    union {
+        /* Common data to identify MAC binding. */
+        struct mac_cache_mb_data mb;
+    } data;
+};
+
+void
+mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
+                                      struct ofputil_flow_stats *ofp_stats)
+{
+    struct mac_cache_stats *stats = xmalloc(sizeof *stats);
+
+    stats->idle_age_ms = ofp_stats->idle_age * 1000;
+    stats->data.mb = (struct mac_cache_mb_data) {
+        .port_key = ofp_stats->match.flow.regs[MFF_LOG_INPORT - MFF_REG0],
+        .dp_key = ntohll(ofp_stats->match.flow.metadata),
+        .mac = ofp_stats->match.flow.dl_src
+    };
+
+    if (ofp_stats->match.flow.dl_type == htons(ETH_TYPE_IP)) {
+        stats->data.mb.ip = in6_addr_mapped_ipv4(ofp_stats->match.flow.nw_src);
+    } else {
+        stats->data.mb.ip = ofp_stats->match.flow.ipv6_src;
+    }
+
+    ovs_list_push_back(stats_list, &stats->list_node);
+}
+
+void
+mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
+                       void *data)
+{
+    struct mac_cache_data *cache_data = data;
+    struct hmap *thresholds = &cache_data->thresholds[MAC_CACHE_MAC_BINDING];
+    long long timewall_now = time_wall_msec();
+
+    struct mac_cache_stats *stats;
+    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
+        struct mac_cache_mac_binding *mc_mb =
+                mac_cache_mac_binding_find(cache_data, &stats->data.mb);
+        if (!mc_mb) {
+            free(stats);
+            continue;
+        }
+
+        struct uuid *dp_uuid = &mc_mb->sbrec_mb->datapath->header_.uuid;
+        struct mac_cache_threshold *threshold =
+                mac_cache_threshold_find(thresholds, dp_uuid);
+
+        /* If "idle_age" is under threshold it means that the mac binding is
+         * used on this chassis. Also make sure that we don't update the
+         * timestamp more than once during the dump period. */
+        if (stats->idle_age_ms < threshold->value &&
+            (timewall_now - mc_mb->sbrec_mb->timestamp) >=
+            threshold->dump_period) {
+            sbrec_mac_binding_set_timestamp(mc_mb->sbrec_mb, timewall_now);
+        }
+
+        free(stats);
+    }
+
+    mac_cache_update_req_delay(thresholds, req_delay);
+}
+
+void
+mac_cache_stats_destroy(struct ovs_list *stats_list)
+{
+    struct mac_cache_stats *stats;
+    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
+        free(stats);
+    }
+}
+
 static uint32_t
 mac_cache_mb_data_hash(const struct mac_cache_mb_data *mb_data)
 {
@@ -268,3 +348,16 @@  mac_cache_threshold_remove(struct hmap *thresholds,
     hmap_remove(thresholds, &threshold->hmap_node);
     free(threshold);
 }
+
+static void
+mac_cache_update_req_delay(struct hmap *thresholds, uint64_t *req_delay)
+{
+    struct mac_cache_threshold *threshold;
+
+    uint64_t dump_period = UINT64_MAX;
+    HMAP_FOR_EACH (threshold, hmap_node, thresholds) {
+        dump_period = MIN(dump_period, threshold->dump_period);
+    }
+
+    *req_delay = dump_period < UINT64_MAX ? dump_period : 0;
+}
diff --git a/controller/mac_cache.h b/controller/mac_cache.h
index af853a090..2a3dc66f0 100644
--- a/controller/mac_cache.h
+++ b/controller/mac_cache.h
@@ -80,4 +80,11 @@  void mac_cache_mac_binding_remove(struct mac_cache_data *data,
 void mac_cache_mac_bindings_clear(struct mac_cache_data *data);
 bool mac_cache_sb_mac_binding_updated(const struct sbrec_mac_binding *mb);
 
+void
+mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
+                                      struct ofputil_flow_stats *ofp_stats);
+void mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
+                            void *data);
+void mac_cache_stats_destroy(struct ovs_list *stats_list);
+
 #endif /* controller/mac_cache.h */
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 7b2fd18f4..f050571f0 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -84,6 +84,7 @@ 
 #include "hmapx.h"
 #include "mirror.h"
 #include "mac_cache.h"
+#include "statctrl.h"
 
 VLOG_DEFINE_THIS_MODULE(main);
 
@@ -4891,6 +4892,7 @@  main(int argc, char *argv[])
     lflow_init();
     mirror_init();
     vif_plug_provider_initialize();
+    statctrl_init();
 
     /* Connect to OVS OVSDB instance. */
     struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
@@ -5236,7 +5238,7 @@  main(int argc, char *argv[])
                      runtime_data_ovs_interface_shadow_handler);
 
     engine_add_input(&en_mac_cache, &en_runtime_data,
-                     engine_noop_handler);
+                     mac_cache_runtime_data_handler);
     engine_add_input(&en_mac_cache, &en_sb_mac_binding,
                      mac_cache_sb_mac_binding_handler);
     engine_add_input(&en_mac_cache, &en_sb_datapath_binding,
@@ -5298,6 +5300,8 @@  main(int argc, char *argv[])
         engine_get_internal_data(&en_template_vars);
     struct ed_type_lb_data *lb_data =
         engine_get_internal_data(&en_lb_data);
+    struct mac_cache_data *mac_cache_data =
+            engine_get_internal_data(&en_mac_cache);
 
     ofctrl_init(&lflow_output_data->group_table,
                 &lflow_output_data->meter_table,
@@ -5686,6 +5690,11 @@  main(int argc, char *argv[])
                         }
                     }
 
+                    if (mac_cache_data) {
+                        statctrl_update(br_int->name);
+                        statctrl_run(ovnsb_idl_txn, mac_cache_data);
+                    }
+
                     ofctrl_seqno_update_create(
                         ofctrl_seq_type_nb_cfg,
                         get_nb_cfg(sbrec_sb_global_table_get(
@@ -5795,6 +5804,7 @@  main(int argc, char *argv[])
             if (br_int) {
                 ofctrl_wait();
                 pinctrl_wait(ovnsb_idl_txn);
+                statctrl_wait(ovnsb_idl_txn);
             }
 
             binding_wait();
@@ -5929,6 +5939,7 @@  loop_done:
     patch_destroy();
     mirror_destroy();
     encaps_destroy();
+    statctrl_destroy();
     if_status_mgr_destroy(if_mgr);
     shash_destroy(&vif_plug_deleted_iface_ids);
     shash_destroy(&vif_plug_changed_iface_ids);
diff --git a/controller/statctrl.c b/controller/statctrl.c
new file mode 100644
index 000000000..92325fbeb
--- /dev/null
+++ b/controller/statctrl.c
@@ -0,0 +1,435 @@ 
+/* Copyright (c) 2023, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "byte-order.h"
+#include "dirs.h"
+#include "latch.h"
+#include "lflow.h"
+#include "mac_cache.h"
+#include "openvswitch/ofp-errors.h"
+#include "openvswitch/ofp-flow.h"
+#include "openvswitch/ofp-msgs.h"
+#include "openvswitch/ofp-print.h"
+#include "openvswitch/ofp-util.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/rconn.h"
+#include "openvswitch/vlog.h"
+#include "ovn/logical-fields.h"
+#include "ovs-thread.h"
+#include "seq.h"
+#include "socket-util.h"
+#include "statctrl.h"
+
+VLOG_DEFINE_THIS_MODULE(statctrl);
+
+enum stat_type {
+    STATS_MAC_BINDING = 0,
+    STATS_MAX,
+};
+
+struct stats_node {
+    /* The statistics request. */
+    struct  ofputil_flow_stats_request request;
+    /* xid of the last statistics request. */
+    ovs_be32 xid;
+    /* Timestamp when the next request should happen. */
+    int64_t next_request_timestamp;
+    /* Request delay in ms. */
+    uint64_t request_delay;
+    /* List of processed statistics. */
+    struct ovs_list stats_list;
+    /* Function to clean up the node.
+     * This function runs in main thread. */
+    void (*destroy)(struct ovs_list *stats_list);
+    /* Function to process the response and store it in the list.
+     * This function runs in statctrl thread locked behind mutex. */
+    void (*process_flow_stats)(struct ovs_list *stats_list,
+                               struct ofputil_flow_stats *ofp_stats);
+    /* Function to process the parsed stats.
+     * This function runs in main thread locked behind mutex. */
+    void (*run)(struct ovs_list *stats_list, uint64_t *req_delay, void *data);
+};
+
+#define STATS_NODE(NAME, REQUEST, DESTROY, PROCESS, RUN)                   \
+    statctrl_ctx.nodes[STATS_##NAME] = (struct stats_node) {               \
+        .request = REQUEST,                                                \
+        .xid = 0,                                                          \
+        .next_request_timestamp = INT64_MAX,                               \
+        .request_delay = 0,                                                \
+        .stats_list =                                                      \
+            OVS_LIST_INITIALIZER(                                          \
+                &statctrl_ctx.nodes[STATS_##NAME].stats_list),             \
+        .destroy = DESTROY,                                                \
+        .process_flow_stats = PROCESS,                                     \
+        .run = RUN                                                         \
+    };
+
+struct statctrl_ctx {
+    char *br_int;
+
+    pthread_t thread;
+    struct latch exit_latch;
+
+    struct seq *thread_seq;
+    struct seq *main_seq;
+
+    struct stats_node nodes[STATS_MAX];
+};
+
+static struct statctrl_ctx statctrl_ctx;
+static struct ovs_mutex mutex;
+
+static void *statctrl_thread_handler(void *arg);
+static void statctrl_rconn_setup(struct rconn *swconn, char *conn_target)
+    OVS_REQUIRES(mutex);
+static void statctrl_handle_rconn_msg(struct rconn *swconn,
+                                      struct statctrl_ctx *ctx,
+                                      struct ofpbuf *msg);
+static enum stat_type statctrl_get_stat_type(struct statctrl_ctx *ctx,
+                                             const struct ofp_header *oh);
+static void statctrl_decode_statistics_reply(struct stats_node *node,
+                                             struct ofpbuf *msg)
+    OVS_REQUIRES(mutex);
+static void statctrl_send_request(struct rconn *swconn,
+                                  struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex);
+static void statctrl_notify_main_thread(struct statctrl_ctx *ctx);
+static void statctrl_set_conn_target(const char *br_int_name)
+    OVS_REQUIRES(mutex);
+static void statctrl_wait_next_request(struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex);
+static bool statctrl_update_next_request_timestamp(struct stats_node *node,
+                                                   long long now,
+                                                   uint64_t prev_delay)
+    OVS_REQUIRES(mutex);
+
+void
+statctrl_init(void)
+{
+    statctrl_ctx.br_int = NULL;
+    latch_init(&statctrl_ctx.exit_latch);
+    ovs_mutex_init(&mutex);
+    statctrl_ctx.thread_seq = seq_create();
+    statctrl_ctx.main_seq = seq_create();
+
+    /* Definition of all stat nodes. */
+    struct ofputil_flow_stats_request mac_binding_request = {
+            .cookie = htonll(0),
+            .cookie_mask = htonll(0),
+            .out_port = OFPP_ANY,
+            .out_group = OFPG_ANY,
+            .table_id = OFTABLE_MAC_CACHE_USE,
+    };
+    STATS_NODE(MAC_BINDING, mac_binding_request, mac_cache_stats_destroy,
+               mac_cache_mb_stats_process_flow_stats, mac_cache_mb_stats_run);
+
+
+    statctrl_ctx.thread = ovs_thread_create("ovn_statctrl",
+                                            statctrl_thread_handler,
+                                            &statctrl_ctx);
+}
+
+void
+statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
+             struct mac_cache_data *mac_cache_data)
+{
+    if (!ovnsb_idl_txn) {
+        return;
+    }
+
+    void *node_data[STATS_MAX] = {mac_cache_data};
+
+    bool schedule_updated = false;
+    long long now = time_msec();
+
+    ovs_mutex_lock(&mutex);
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        uint64_t prev_delay = node->request_delay;
+
+        node->run(&node->stats_list, &node->request_delay, node_data[i]);
+
+        schedule_updated |=
+                statctrl_update_next_request_timestamp(node, now, prev_delay);
+    }
+    ovs_mutex_unlock(&mutex);
+
+    if (schedule_updated) {
+        seq_change(statctrl_ctx.thread_seq);
+    }
+}
+
+void
+statctrl_update(const char *br_int_name)
+{
+    ovs_mutex_lock(&mutex);
+    statctrl_set_conn_target(br_int_name);
+    ovs_mutex_unlock(&mutex);
+}
+
+void
+statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
+{
+    if (!ovnsb_idl_txn) {
+        return;
+    }
+
+    ovs_mutex_lock(&mutex);
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        if (!ovs_list_is_empty(&node->stats_list)) {
+            poll_immediate_wake();
+        }
+    }
+    int64_t new_seq = seq_read(statctrl_ctx.main_seq);
+    seq_wait(statctrl_ctx.main_seq, new_seq);
+    ovs_mutex_unlock(&mutex);
+}
+
+void
+statctrl_destroy(void)
+{
+    latch_set(&statctrl_ctx.exit_latch);
+    pthread_join(statctrl_ctx.thread, NULL);
+    latch_destroy(&statctrl_ctx.exit_latch);
+    free(statctrl_ctx.br_int);
+    seq_destroy(statctrl_ctx.thread_seq);
+    seq_destroy(statctrl_ctx.main_seq);
+
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        node->destroy(&node->stats_list);
+    }
+}
+
+static void *
+statctrl_thread_handler(void *arg)
+{
+    struct statctrl_ctx *ctx = arg;
+
+    /* OpenFlow connection to the switch. */
+    struct rconn *swconn = rconn_create(5, 0, DSCP_DEFAULT,
+                                        1 << OFP15_VERSION);
+
+    while (!latch_is_set(&ctx->exit_latch)) {
+        ovs_mutex_lock(&mutex);
+        statctrl_rconn_setup(swconn, ctx->br_int);
+        ovs_mutex_unlock(&mutex);
+
+        rconn_run(swconn);
+        uint64_t new_seq = seq_read(ctx->thread_seq);
+
+        if (rconn_is_connected(swconn)) {
+            for (int i = 0; i < 100; i++) {
+                struct ofpbuf *msg = rconn_recv(swconn);
+
+                if (!msg) {
+                    break;
+                }
+
+                statctrl_handle_rconn_msg(swconn, ctx, msg);
+                ofpbuf_delete(msg);
+            }
+
+            ovs_mutex_lock(&mutex);
+            statctrl_send_request(swconn, ctx);
+            ovs_mutex_unlock(&mutex);
+        }
+
+        statctrl_notify_main_thread(ctx);
+        rconn_run_wait(swconn);
+        rconn_recv_wait(swconn);
+        ovs_mutex_lock(&mutex);
+        statctrl_wait_next_request(ctx);
+        ovs_mutex_unlock(&mutex);
+        seq_wait(ctx->thread_seq, new_seq);
+        latch_wait(&ctx->exit_latch);
+
+        poll_block();
+    }
+
+    rconn_destroy(swconn);
+    return NULL;
+}
+
+static void
+statctrl_rconn_setup(struct rconn *swconn, char *br_int)
+    OVS_REQUIRES(mutex)
+{
+    if (!br_int) {
+        rconn_disconnect(swconn);
+        return;
+    }
+
+    char *conn_target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int);
+
+    if (strcmp(conn_target, rconn_get_target(swconn))) {
+        VLOG_INFO("%s: connecting to switch", conn_target);
+        rconn_connect(swconn, conn_target, conn_target);
+    }
+
+    free(conn_target);
+}
+
+static void
+statctrl_handle_rconn_msg(struct rconn *swconn, struct statctrl_ctx *ctx,
+                          struct ofpbuf *msg)
+{
+    enum ofptype type;
+    const struct ofp_header *oh = msg->data;
+
+    ofptype_decode(&type, oh);
+
+    if (type == OFPTYPE_ECHO_REQUEST) {
+        rconn_send(swconn, ofputil_encode_echo_reply(oh), NULL);
+    } else if (type == OFPTYPE_FLOW_STATS_REPLY) {
+        enum stat_type stype = statctrl_get_stat_type(ctx, oh);
+        if (stype == STATS_MAX) {
+            return;
+        }
+
+        ovs_mutex_lock(&mutex);
+        statctrl_decode_statistics_reply(&ctx->nodes[stype], msg);
+        ovs_mutex_unlock(&mutex);
+    } else {
+        if (VLOG_IS_DBG_ENABLED()) {
+
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
+
+            char *s = ofp_to_string(oh, ntohs(oh->length), NULL, NULL, 2);
+
+            VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
+            free(s);
+        }
+    }
+}
+
+static enum stat_type
+statctrl_get_stat_type(struct statctrl_ctx *ctx, const struct ofp_header *oh)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        if (ctx->nodes[i].xid == oh->xid) {
+            return i;
+        }
+    }
+    return STATS_MAX;
+}
+
+static void
+statctrl_decode_statistics_reply(struct stats_node *node, struct ofpbuf *msg)
+    OVS_REQUIRES(mutex)
+{
+    struct ofpbuf ofpacts;
+    ofpbuf_init(&ofpacts, 0);
+
+    while (true) {
+        struct ofputil_flow_stats fs;
+
+        int error = ofputil_decode_flow_stats_reply(&fs, msg, true, &ofpacts);
+        if (error == EOF) {
+            break;
+        } else if (error) {
+            VLOG_DBG("Couldn't parse stat reply: %s", ofperr_to_string(error));
+            break;
+        }
+
+        node->process_flow_stats(&node->stats_list, &fs);
+    }
+
+    ofpbuf_uninit(&ofpacts);
+}
+
+static void
+statctrl_send_request(struct rconn *swconn, struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex)
+{
+    long long now = time_msec();
+    enum ofp_version version = rconn_get_version(swconn);
+    enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
+
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &ctx->nodes[i];
+
+        if (now < node->next_request_timestamp) {
+            continue;
+        }
+
+        struct ofpbuf *msg =
+                ofputil_encode_flow_stats_request(&node->request, proto);
+        node->xid = ((struct ofp_header *) msg->data)->xid;
+
+        statctrl_update_next_request_timestamp(node, now, 0);
+
+        rconn_send(swconn, msg, NULL);
+    }
+}
+
+static void
+statctrl_notify_main_thread(struct statctrl_ctx *ctx)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        if (!ovs_list_is_empty(&ctx->nodes[i].stats_list)) {
+            seq_change(ctx->main_seq);
+            return;
+        }
+    }
+}
+
+static void
+statctrl_set_conn_target(const char *br_int_name)
+    OVS_REQUIRES(mutex)
+{
+    if (!br_int_name) {
+        return;
+    }
+
+
+    if (!statctrl_ctx.br_int || strcmp(statctrl_ctx.br_int, br_int_name)) {
+        free(statctrl_ctx.br_int);
+        statctrl_ctx.br_int = xstrdup(br_int_name);
+        /* Notify statctrl thread that integration bridge is set/changed. */
+        seq_change(statctrl_ctx.thread_seq);
+    }
+}
+
+static void
+statctrl_wait_next_request(struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        int64_t timestamp = ctx->nodes[i].next_request_timestamp;
+        if (timestamp < INT64_MAX) {
+            poll_timer_wait_until(timestamp);
+        }
+    }
+}
+
+static bool
+statctrl_update_next_request_timestamp(struct stats_node *node,
+                                       long long now, uint64_t prev_delay)
+{
+    if (!node->request_delay) {
+        node->next_request_timestamp = INT64_MAX;
+        return false;
+    }
+
+    int64_t timestamp = prev_delay ? node->next_request_timestamp : now;
+    node->next_request_timestamp =
+            timestamp + node->request_delay - prev_delay;
+
+    return timestamp != node->next_request_timestamp;
+}
diff --git a/controller/statctrl.h b/controller/statctrl.h
new file mode 100644
index 000000000..c5cede353
--- /dev/null
+++ b/controller/statctrl.h
@@ -0,0 +1,28 @@ 
+/* Copyright (c) 2023, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STATCTRL_H
+#define STATCTRL_H
+
+#include "mac_cache.h"
+
+void statctrl_init(void);
+void statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
+                  struct mac_cache_data *mac_cache_data);
+void statctrl_update(const char *br_int_name);
+void statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn);
+void statctrl_destroy(void);
+
+#endif /* controller/statctrl.h */
diff --git a/tests/ovn.at b/tests/ovn.at
index f73c31974..04a71bedb 100644
--- a/tests/ovn.at
+++ b/tests/ovn.at
@@ -34614,6 +34614,8 @@  AT_CHECK_UNQUOTED([as hv1 ovs-ofctl dump-flows br-int table=79 --no-stats | stri
  table=79, priority=100,ip,reg14=${port_key},metadata=${dp_key},dl_src=00:00:00:00:10:20,nw_src=192.168.10.20 actions=drop
 ])
 
+timestamp=$(fetch_column mac_binding timestamp ip="192.168.10.20")
+
 send_udp hv1 ext1 10
 send_udp hv2 ext2 20
 
@@ -34621,16 +34623,27 @@  OVS_WAIT_UNTIL([as hv1 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.1
 OVS_WAIT_UNTIL([as hv2 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.20" | grep -q "n_packets=1"])
 
 # Set the MAC binding aging threshold
-AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=1])
-AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=1])
+AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=5])
+AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=5])
 AT_CHECK([ovn-nbctl --wait=sb sync])
 
+# Wait send few packets for "192.168.10.20" to indicate that it is still in use
+send_udp hv2 ext2 20
+sleep 1
+send_udp hv2 ext2 20
+
 # Set the timeout for OVS_WAIT* functions to 5 seconds
-OVS_CTL_TIMEOUT=5
+OVS_CTL_TIMEOUT=10
+OVS_WAIT_UNTIL([
+    test "$timestamp" != "$(fetch_column mac_binding timestamp ip='192.168.10.20')"
+])
+check $(test "$(fetch_column mac_binding timestamp ip='192.168.10.20')" != "")
+
 # Check if the records are removed after some inactivity
 OVS_WAIT_UNTIL([
     test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.10')"
 ])
+# The second one takes longer because it got refreshed
 OVS_WAIT_UNTIL([
     test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.20')"
 ])
@@ -35313,8 +35326,8 @@  as hv1
 check ovs-vsctl add-br br-phys
 ovn_attach n1 br-phys 192.168.0.1
 
-dnl Ensure that there are at least 3 openflow connections.
-OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "3"])
+dnl Ensure that there are 4 openflow connections.
+OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "4"])
 
 dnl "Wait" 3 times 60 seconds and ensure ovn-controller writes to the
 dnl openflow connections in the meantime.  This should allow ovs-vswitchd