diff mbox series

[ovs-dev,5/5] controller: Update MAC binding timestamp

Message ID 20230710110517.128560-6-amusil@redhat.com
State Superseded
Headers show
Series Add MAC binding aging timestamp refresh mechanism | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Ales Musil July 10, 2023, 11:05 a.m. UTC
To achieve that add thread that will handle
statistics requests and delegate the processing
to defined functions. This allows the thread to
be flexible enough, so it could be extended in future
if needed.

At the same time connected the thread with the MAC
cache I-P node to have the timestamp updates. The
updates should happen once per dump_period
(3/4 of the aging threshold) per chassis only if
the MAC binding is actively used.

Signed-off-by: Ales Musil <amusil@redhat.com>
---
 controller/automake.mk      |   4 +-
 controller/mac_cache.c      |  84 +++++++
 controller/mac_cache.h      |   7 +
 controller/ovn-controller.c |  11 +
 controller/statctrl.c       | 434 ++++++++++++++++++++++++++++++++++++
 controller/statctrl.h       |  28 +++
 tests/ovn.at                |  23 +-
 7 files changed, 585 insertions(+), 6 deletions(-)
 create mode 100644 controller/statctrl.c
 create mode 100644 controller/statctrl.h

Comments

Mark Michelson July 27, 2023, 7:16 p.m. UTC | #1
Hi Ales, I had a look through this patch finally :)

 From a high-level, I appreciate the design. Periodically requesting 
flow stats in a background thread makes good sense.

I have a couple of suggestions to make to the design.

On 7/10/23 07:05, Ales Musil wrote:
> To achieve that add thread that will handle
> statistics requests and delegate the processing
> to defined functions. This allows the thread to
> be flexible enough, so it could be extended in future
> if needed.
> 
> At the same time connected the thread with the MAC
> cache I-P node to have the timestamp updates. The
> updates should happen once per dump_period
> (3/4 of the aging threshold) per chassis only if
> the MAC binding is actively used.
> 
> Signed-off-by: Ales Musil <amusil@redhat.com>
> ---
>   controller/automake.mk      |   4 +-
>   controller/mac_cache.c      |  84 +++++++
>   controller/mac_cache.h      |   7 +
>   controller/ovn-controller.c |  11 +
>   controller/statctrl.c       | 434 ++++++++++++++++++++++++++++++++++++
>   controller/statctrl.h       |  28 +++
>   tests/ovn.at                |  23 +-
>   7 files changed, 585 insertions(+), 6 deletions(-)
>   create mode 100644 controller/statctrl.c
>   create mode 100644 controller/statctrl.h
> 
> diff --git a/controller/automake.mk b/controller/automake.mk
> index 562290359..0dbbd5d26 100644
> --- a/controller/automake.mk
> +++ b/controller/automake.mk
> @@ -45,7 +45,9 @@ controller_ovn_controller_SOURCES = \
>   	controller/mirror.h \
>   	controller/mirror.c \
>   	controller/mac_cache.h \
> -	controller/mac_cache.c
> +	controller/mac_cache.c \
> +	controller/statctrl.h \
> +	controller/statctrl.c
>   
>   controller_ovn_controller_LDADD = lib/libovn.la $(OVS_LIBDIR)/libopenvswitch.la
>   man_MANS += controller/ovn-controller.8
> diff --git a/controller/mac_cache.c b/controller/mac_cache.c
> index 4663499a1..6f1d661d4 100644
> --- a/controller/mac_cache.c
> +++ b/controller/mac_cache.c
> @@ -252,3 +252,87 @@ mac_cache_threshold_remove(struct hmap *thresholds,
>       hmap_remove(thresholds, &threshold->hmap_node);
>       free(threshold);
>   }
> +
> +struct mac_cache_mb_stats {
> +    struct ovs_list list_node;
> +
> +    int64_t idle_age_ms;
> +    uint32_t cookie;
> +    /* Common data to identify MAC binding. */
> +    struct mac_cache_mb_data data;
> +};
> +
> +void
> +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> +                                      struct ofputil_flow_stats *ofp_stats)
> +{
> +    struct mac_cache_mb_stats *stats = xmalloc(sizeof *stats);
> +
> +    stats->idle_age_ms = ofp_stats->idle_age * 1000;
> +    stats->cookie = ntohll(ofp_stats->cookie);
> +    stats->data.port_key =
> +            ofp_stats->match.flow.regs[MFF_LOG_INPORT - MFF_REG0];
> +    stats->data.dp_key = ntohll(ofp_stats->match.flow.metadata);
> +
> +    if (ofp_stats->match.flow.dl_type == htons(ETH_TYPE_IP)) {
> +        stats->data.ip = in6_addr_mapped_ipv4(ofp_stats->match.flow.nw_src);
> +    } else {
> +        stats->data.ip = ofp_stats->match.flow.ipv6_src;
> +    }
> +
> +    stats->data.mac = ofp_stats->match.flow.dl_src;
> +
> +    ovs_list_push_back(stats_list, &stats->list_node);
> +}
> +
> +void
> +mac_cache_mb_stats_destroy(struct ovs_list *stats_list)
> +{
> +    struct mac_cache_mb_stats *stats;
> +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> +        free(stats);
> +    }
> +}
> +
> +void
> +mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
> +                       void *data)
> +{
> +    struct mac_cache_data *cache_data = data;
> +    long long timewall_now = time_wall_msec();
> +
> +    struct mac_cache_threshold *threshold;
> +    struct mac_cache_mb_stats *stats;
> +    struct mac_cache_mac_binding *mc_mb;
> +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> +        mc_mb = mac_cache_mac_binding_find_by_mb_data(cache_data,
> +                                                      &stats->data);
> +
> +        if (!mc_mb) {
> +            free(stats);
> +            continue;
> +        }
> +
> +        struct uuid *dp_uuid = &mc_mb->sbrec_mb->datapath->header_.uuid;
> +        threshold = mac_cache_threshold_find(&cache_data->mb_thresholds,
> +                                             dp_uuid);
> +
> +        uint64_t dump_period = (3 * threshold->value) / 4;

The dump period correlates directly with the configured threshold. 
Perhaps the dump period could be stored on the mac_cache_threshold and 
updated whenever the threshold->value is changed. This way, you would 
not have to calculate it twice every time this function is run.

I also suggest offloading the dump_period calculation to a function so 
that it is easy to change if desired.

> +        /* If "idle_age" is under threshold it means that the mac binding is
> +         * used on this chassis. Also make sure that we don't update the
> +         * timestamp more than once during the dump period. */
> +        if (stats->idle_age_ms < threshold->value &&
> +            (timewall_now - mc_mb->sbrec_mb->timestamp) >= dump_period) {
> +            sbrec_mac_binding_set_timestamp(mc_mb->sbrec_mb, timewall_now);
> +        }
> +
> +        free(stats);
> +    }
> +
> +    uint64_t dump_period = UINT64_MAX;
> +    HMAP_FOR_EACH (threshold, hmap_node, &cache_data->mb_thresholds) {
> +        dump_period = MIN(dump_period, (3 * threshold->value) / 4);
> +    }
> +
> +    *req_delay = dump_period < UINT64_MAX ? dump_period : 0;
> +}
> diff --git a/controller/mac_cache.h b/controller/mac_cache.h
> index f1f1772c8..a29713908 100644
> --- a/controller/mac_cache.h
> +++ b/controller/mac_cache.h
> @@ -71,4 +71,11 @@ void mac_cache_mac_binding_remove(struct mac_cache_data *data,
>   void mac_cache_mac_bindings_destroy(struct mac_cache_data *data);
>   bool mac_cache_sb_mac_binding_updated(const struct sbrec_mac_binding *mb);
>   
> +void
> +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> +                                      struct ofputil_flow_stats *ofp_stats);
> +void mac_cache_mb_stats_destroy(struct ovs_list *stats_list);
> +void mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
> +                            void *data);
> +
>   #endif /* controller/mac_cache.h */
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index abb18647a..bdf7368b8 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -84,6 +84,7 @@
>   #include "hmapx.h"
>   #include "mirror.h"
>   #include "mac_cache.h"
> +#include "statctrl.h"
>   
>   VLOG_DEFINE_THIS_MODULE(main);
>   
> @@ -4804,6 +4805,7 @@ main(int argc, char *argv[])
>       lflow_init();
>       mirror_init();
>       vif_plug_provider_initialize();
> +    statctrl_init();
>   
>       /* Connect to OVS OVSDB instance. */
>       struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> @@ -5205,6 +5207,8 @@ main(int argc, char *argv[])
>           engine_get_internal_data(&en_template_vars);
>       struct ed_type_lb_data *lb_data =
>           engine_get_internal_data(&en_lb_data);
> +    struct mac_cache_data *mac_cache_data =
> +            engine_get_internal_data(&en_mac_cache);
>   
>       ofctrl_init(&lflow_output_data->group_table,
>                   &lflow_output_data->meter_table,
> @@ -5593,6 +5597,11 @@ main(int argc, char *argv[])
>                           }
>                       }
>   
> +                    if (mac_cache_data) {
> +                        statctrl_update(br_int->name);
> +                        statctrl_run(ovnsb_idl_txn, mac_cache_data);
> +                    }
> +
>                       ofctrl_seqno_update_create(
>                           ofctrl_seq_type_nb_cfg,
>                           get_nb_cfg(sbrec_sb_global_table_get(
> @@ -5702,6 +5711,7 @@ main(int argc, char *argv[])
>               if (br_int) {
>                   ofctrl_wait();
>                   pinctrl_wait(ovnsb_idl_txn);
> +                statctrl_wait(ovnsb_idl_txn);
>               }
>   
>               binding_wait();
> @@ -5836,6 +5846,7 @@ loop_done:
>       patch_destroy();
>       mirror_destroy();
>       encaps_destroy();
> +    statctrl_destroy();
>       if_status_mgr_destroy(if_mgr);
>       shash_destroy(&vif_plug_deleted_iface_ids);
>       shash_destroy(&vif_plug_changed_iface_ids);
> diff --git a/controller/statctrl.c b/controller/statctrl.c
> new file mode 100644
> index 000000000..9bef827fc
> --- /dev/null
> +++ b/controller/statctrl.c
> @@ -0,0 +1,434 @@
> +/* Copyright (c) 2023, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "byte-order.h"
> +#include "dirs.h"
> +#include "latch.h"
> +#include "lflow.h"
> +#include "mac_cache.h"
> +#include "openvswitch/ofp-errors.h"
> +#include "openvswitch/ofp-flow.h"
> +#include "openvswitch/ofp-msgs.h"
> +#include "openvswitch/ofp-print.h"
> +#include "openvswitch/ofp-util.h"
> +#include "openvswitch/poll-loop.h"
> +#include "openvswitch/rconn.h"
> +#include "openvswitch/vlog.h"
> +#include "ovn/logical-fields.h"
> +#include "ovs-thread.h"
> +#include "seq.h"
> +#include "socket-util.h"
> +#include "statctrl.h"
> +
> +VLOG_DEFINE_THIS_MODULE(statctrl);
> +
> +enum stat_type {
> +    STATS_MAC_BINDING = 0,
> +    STATS_MAX,
> +};
> +
> +struct stats_node {
> +    /* Table ID for the statistics request. */
> +    uint8_t table_id;

Instead of using a table_id here, would it make sense to store a struct 
ofputil_flow_stats_request instead? This way, if stats nodes care about 
group IDs or cookies, they can store them here.

> +    /* xid of the last statistics request. */
> +    ovs_be32 xid;
> +    /* Timestamp when the next request should happen. */
> +    int64_t next_request_timestamp;
> +    /* Request delay in ms. */
> +    uint64_t request_delay;
> +    /* List of processed statistics. */
> +    struct ovs_list stats_list;
> +    /* Function to clean up the node.
> +     * This function runs in main thread. */
> +    void (*destroy)(struct ovs_list *stats_list);
> +    /* Function to process the response and store it in the list.
> +     * This function runs in statctrl thread locked behind mutex. */
> +    void (*process_flow_stats)(struct ovs_list *stats_list,
> +                               struct ofputil_flow_stats *ofp_stats);
> +    /* Function to process the parsed stats.
> +     * This function runs in main thread locked behind mutex. */
> +    void (*run)(struct ovs_list *stats_list, uint64_t *req_delay, void *data);
> +};
> +
> +#define STATS_NODE(NAME, TABLE_ID, DESTROY, PROCESS, RUN)                  \
> +    statctrl_ctx.nodes[STATS_##NAME] = (struct stats_node) {               \
> +        .table_id = TABLE_ID,                                              \
> +        .xid = 0,                                                          \
> +        .next_request_timestamp = INT64_MAX,                               \
> +        .request_delay = 0,                                                \
> +        .stats_list =                                                      \
> +            OVS_LIST_INITIALIZER(                                          \
> +                &statctrl_ctx.nodes[STATS_##NAME].stats_list),             \
> +        .destroy = DESTROY,                                                \
> +        .process_flow_stats = PROCESS,                                     \
> +        .run = RUN                                                         \
> +    };
> +
> +struct statctrl_ctx {
> +    char *br_int;
> +
> +    pthread_t thread;
> +    struct latch exit_latch;
> +
> +    struct seq *thread_seq;
> +    struct seq *main_seq;
> +
> +    struct stats_node nodes[STATS_MAX];
> +};
> +
> +static struct statctrl_ctx statctrl_ctx;
> +static struct ovs_mutex mutex;
> +
> +static void *statctrl_thread_handler(void *arg);
> +static void statctrl_rconn_setup(struct rconn *swconn, char *conn_target)
> +    OVS_REQUIRES(mutex);
> +static void statctrl_handle_rconn_msg(struct rconn *swconn,
> +                                      struct statctrl_ctx *ctx,
> +                                      struct ofpbuf *msg);
> +static enum stat_type statctrl_get_stat_type(struct statctrl_ctx *ctx,
> +                                             const struct ofp_header *oh);
> +static void statctrl_decode_statistics_reply(struct stats_node *node,
> +                                             struct ofpbuf *msg)
> +    OVS_REQUIRES(mutex);
> +static void statctrl_send_request(struct rconn *swconn,
> +                                  struct statctrl_ctx *ctx)
> +    OVS_REQUIRES(mutex);
> +static void statctrl_notify_main_thread(struct statctrl_ctx *ctx);
> +static void statctrl_set_conn_target(const char *br_int_name)
> +    OVS_REQUIRES(mutex);
> +static void statctrl_wait_next_request(struct statctrl_ctx *ctx)
> +    OVS_REQUIRES(mutex);
> +static bool statctrl_update_next_request_timestamp(struct stats_node *node,
> +                                                   long long now,
> +                                                   uint64_t prev_delay)
> +    OVS_REQUIRES(mutex);
> +
> +void
> +statctrl_init(void)
> +{
> +    statctrl_ctx.br_int = NULL;
> +    latch_init(&statctrl_ctx.exit_latch);
> +    ovs_mutex_init(&mutex);
> +    statctrl_ctx.thread_seq = seq_create();
> +    statctrl_ctx.main_seq = seq_create();
> +
> +    /* Definition of all stat nodes. */
> +    STATS_NODE(MAC_BINDING, OFTABLE_MAC_CACHE_USE, mac_cache_mb_stats_destroy,
> +               mac_cache_mb_stats_process_flow_stats, mac_cache_mb_stats_run);
> +
> +
> +    statctrl_ctx.thread = ovs_thread_create("ovn_statctrl",
> +                                            statctrl_thread_handler,
> +                                            &statctrl_ctx);
> +}
> +
> +void
> +statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> +             struct mac_cache_data *mac_cache_data)
> +{
> +    if (!ovnsb_idl_txn) {
> +        return;
> +    }
> +
> +    void *node_data[STATS_MAX] = {mac_cache_data};

I was thinking about what happens when more than just the MAC cache 
needs to collect flow statistics. Based on this initial design, my 
assumption is that the function signature for statctrl_run() would grow 
as each new stats node type is defined:

void
statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
              struct mac_cache_data *mac_cache_data,
              struct foo_data *foo_data,
              struct bar_data *bar_data)

Is that the intention? I suppose this will be OK as long as the number 
of stats nodes stays low. But if we suspect that we will end up with a 
lot more, then we may want to switch to passing in, say, a shash of data 
instead of each individual stats node data type. My biggest concern here 
is merge conflicts when trying to backport patches.

Or did you have a different idea in mind for how new stats data types 
would be handled?

> +
> +    bool schedule_updated = false;
> +    long long now = time_msec();
> +
> +    ovs_mutex_lock(&mutex);
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        struct stats_node *node = &statctrl_ctx.nodes[i];
> +        uint64_t prev_delay = node->request_delay;
> +
> +        node->run(&node->stats_list, &node->request_delay, node_data[i]);
> +
> +        schedule_updated |=
> +                statctrl_update_next_request_timestamp(node, now, prev_delay);
> +    }
> +    ovs_mutex_unlock(&mutex);
> +
> +    if (schedule_updated) {
> +        seq_change(statctrl_ctx.thread_seq);
> +    }
> +}
> +
> +void
> +statctrl_update(const char *br_int_name)
> +{
> +    ovs_mutex_lock(&mutex);
> +    statctrl_set_conn_target(br_int_name);
> +    ovs_mutex_unlock(&mutex);
> +}
> +
> +void
> +statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
> +{
> +    if (!ovnsb_idl_txn) {
> +        return;
> +    }
> +
> +    ovs_mutex_lock(&mutex);
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        struct stats_node *node = &statctrl_ctx.nodes[i];
> +        if (!ovs_list_is_empty(&node->stats_list)) {
> +            poll_immediate_wake();
> +        }
> +    }
> +    int64_t new_seq = seq_read(statctrl_ctx.main_seq);
> +    seq_wait(statctrl_ctx.main_seq, new_seq);
> +    ovs_mutex_unlock(&mutex);
> +}
> +
> +void
> +statctrl_destroy(void)
> +{
> +    latch_set(&statctrl_ctx.exit_latch);
> +    pthread_join(statctrl_ctx.thread, NULL);
> +    latch_destroy(&statctrl_ctx.exit_latch);
> +    free(statctrl_ctx.br_int);
> +    seq_destroy(statctrl_ctx.thread_seq);
> +    seq_destroy(statctrl_ctx.main_seq);
> +
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        struct stats_node *node = &statctrl_ctx.nodes[i];
> +        node->destroy(&node->stats_list);
> +    }
> +}
> +
> +static void *
> +statctrl_thread_handler(void *arg)
> +{
> +    struct statctrl_ctx *ctx = arg;
> +
> +    /* OpenFlow connection to the switch. */
> +    struct rconn *swconn = rconn_create(5, 0, DSCP_DEFAULT,
> +                                        1 << OFP15_VERSION);
> +
> +    while (!latch_is_set(&ctx->exit_latch)) {
> +        ovs_mutex_lock(&mutex);
> +        statctrl_rconn_setup(swconn, ctx->br_int);
> +        ovs_mutex_unlock(&mutex);
> +
> +        rconn_run(swconn);
> +        uint64_t new_seq = seq_read(ctx->thread_seq);
> +
> +        if (rconn_is_connected(swconn)) {
> +            for (int i = 0; i < 100; i++) {
> +                struct ofpbuf *msg = rconn_recv(swconn);
> +
> +                if (!msg) {
> +                    break;
> +                }
> +
> +                statctrl_handle_rconn_msg(swconn, ctx, msg);
> +                ofpbuf_delete(msg);
> +            }
> +
> +            ovs_mutex_lock(&mutex);
> +            statctrl_send_request(swconn, ctx);
> +            ovs_mutex_unlock(&mutex);
> +        }
> +
> +        statctrl_notify_main_thread(ctx);
> +        rconn_run_wait(swconn);
> +        rconn_recv_wait(swconn);
> +        ovs_mutex_lock(&mutex);
> +        statctrl_wait_next_request(ctx);
> +        ovs_mutex_unlock(&mutex);
> +        seq_wait(ctx->thread_seq, new_seq);
> +        latch_wait(&ctx->exit_latch);
> +
> +        poll_block();
> +    }
> +
> +    rconn_destroy(swconn);
> +    return NULL;
> +}
> +
> +static void
> +statctrl_rconn_setup(struct rconn *swconn, char *br_int)
> +    OVS_REQUIRES(mutex)
> +{
> +    if (!br_int) {
> +        rconn_disconnect(swconn);
> +        return;
> +    }
> +
> +    char *conn_target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int);
> +
> +    if (strcmp(conn_target, rconn_get_target(swconn))) {
> +        VLOG_INFO("%s: connecting to switch", conn_target);
> +        rconn_connect(swconn, conn_target, conn_target);
> +    }
> +
> +    free(conn_target);
> +}
> +
> +static void
> +statctrl_handle_rconn_msg(struct rconn *swconn, struct statctrl_ctx *ctx,
> +                          struct ofpbuf *msg)
> +{
> +    enum ofptype type;
> +    const struct ofp_header *oh = msg->data;
> +
> +    ofptype_decode(&type, oh);
> +
> +    if (type == OFPTYPE_ECHO_REQUEST) {
> +        rconn_send(swconn, ofputil_encode_echo_reply(oh), NULL);
> +    } else if (type == OFPTYPE_FLOW_STATS_REPLY) {
> +        enum stat_type stype = statctrl_get_stat_type(ctx, oh);
> +        if (stype == STATS_MAX) {
> +            return;
> +        }
> +
> +        ovs_mutex_lock(&mutex);
> +        statctrl_decode_statistics_reply(&ctx->nodes[stype], msg);
> +        ovs_mutex_unlock(&mutex);
> +    } else {
> +        if (VLOG_IS_DBG_ENABLED()) {
> +
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
> +
> +            char *s = ofp_to_string(oh, ntohs(oh->length), NULL, NULL, 2);
> +
> +            VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
> +            free(s);
> +        }
> +    }
> +}
> +
> +static enum stat_type
> +statctrl_get_stat_type(struct statctrl_ctx *ctx, const struct ofp_header *oh)
> +{
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        if (ctx->nodes[i].xid == oh->xid) {
> +            return i;
> +        }
> +    }
> +    return STATS_MAX;
> +}
> +
> +static void
> +statctrl_decode_statistics_reply(struct stats_node *node, struct ofpbuf *msg)
> +    OVS_REQUIRES(mutex)
> +{
> +    struct ofpbuf ofpacts;
> +    ofpbuf_init(&ofpacts, 0);
> +
> +    while (true) {
> +        struct ofputil_flow_stats fs;
> +
> +        int error = ofputil_decode_flow_stats_reply(&fs, msg, true, &ofpacts);
> +        if (error == EOF) {
> +            break;
> +        } else if (error) {
> +            VLOG_DBG("Couldn't parse stat reply: %s", ofperr_to_string(error));
> +            break;
> +        }
> +
> +        node->process_flow_stats(&node->stats_list, &fs);
> +    }
> +
> +    ofpbuf_uninit(&ofpacts);
> +}
> +
> +static void
> +statctrl_send_request(struct rconn *swconn, struct statctrl_ctx *ctx)
> +    OVS_REQUIRES(mutex)
> +{
> +    long long now = time_msec();
> +    enum ofp_version version = rconn_get_version(swconn);
> +    enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
> +
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        struct stats_node *node = &ctx->nodes[i];
> +
> +        if (now < node->next_request_timestamp) {
> +            continue;
> +        }
> +
> +        struct ofputil_flow_stats_request fsr = {
> +                .cookie = htonll(0),
> +                .cookie_mask = htonll(0),
> +                .out_port = OFPP_ANY,
> +                .out_group = OFPG_ANY,
> +                .table_id = node->table_id,
> +        }; > +        struct ofpbuf *msg = ofputil_encode_flow_stats_request(&fsr, 
proto);
> +        node->xid = ((struct ofp_header *) msg->data)->xid;
> +
> +        statctrl_update_next_request_timestamp(node, now, 0);
> +
> +        rconn_send(swconn, msg, NULL);
> +    }
> +}
> +
> +static void
> +statctrl_notify_main_thread(struct statctrl_ctx *ctx)
> +{
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        if (!ovs_list_is_empty(&ctx->nodes[i].stats_list)) {
> +            seq_change(ctx->main_seq);
> +            return;
> +        }
> +    }
> +}
> +
> +static void
> +statctrl_set_conn_target(const char *br_int_name)
> +    OVS_REQUIRES(mutex)
> +{
> +    if (!br_int_name) {
> +        return;
> +    }
> +
> +
> +    if (!statctrl_ctx.br_int || strcmp(statctrl_ctx.br_int, br_int_name)) {
> +        free(statctrl_ctx.br_int);
> +        statctrl_ctx.br_int = xstrdup(br_int_name);
> +        /* Notify statctrl thread that integration bridge is set/changed. */
> +        seq_change(statctrl_ctx.thread_seq);
> +    }
> +}
> +
> +static void
> +statctrl_wait_next_request(struct statctrl_ctx *ctx)
> +    OVS_REQUIRES(mutex)
> +{
> +    for (size_t i = 0; i < STATS_MAX; i++) {
> +        int64_t timestamp = ctx->nodes[i].next_request_timestamp;
> +        if (timestamp < INT64_MAX) {
> +            poll_timer_wait_until(timestamp);
> +        }
> +    }
> +}
> +
> +static bool
> +statctrl_update_next_request_timestamp(struct stats_node *node,
> +                                       long long now, uint64_t prev_delay)
> +{
> +    if (!node->request_delay) {
> +        node->next_request_timestamp = INT64_MAX;
> +        return false;
> +    }
> +
> +    int64_t timestamp = prev_delay ? node->next_request_timestamp : now;
> +    node->next_request_timestamp =
> +            timestamp + node->request_delay - prev_delay;
> +
> +    return timestamp != node->next_request_timestamp;
> +}
> diff --git a/controller/statctrl.h b/controller/statctrl.h
> new file mode 100644
> index 000000000..c5cede353
> --- /dev/null
> +++ b/controller/statctrl.h
> @@ -0,0 +1,28 @@
> +/* Copyright (c) 2023, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef STATCTRL_H
> +#define STATCTRL_H
> +
> +#include "mac_cache.h"
> +
> +void statctrl_init(void);
> +void statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> +                  struct mac_cache_data *mac_cache_data);
> +void statctrl_update(const char *br_int_name);
> +void statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn);
> +void statctrl_destroy(void);
> +
> +#endif /* controller/statctrl.h */
> diff --git a/tests/ovn.at b/tests/ovn.at
> index 7cee8a175..f4c28c57b 100644
> --- a/tests/ovn.at
> +++ b/tests/ovn.at
> @@ -34556,8 +34556,10 @@ AT_CHECK([fetch_column nb:logical_router options name="gw" | grep -q mac_binding
>   send_garp hv1 ext1 10
>   send_garp hv2 ext2 20
>   
> -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.10"])
> -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.20"])
> +wait_row_count mac_binding 1 ip="192.168.10.10"
> +wait_row_count mac_binding 1 ip="192.168.10.20"
> +
> +timestamp=$(fetch_column mac_binding timestamp ip="192.168.10.20")
>   
>   send_udp hv1 ext1 10
>   send_udp hv2 ext2 20
> @@ -34566,16 +34568,27 @@ OVS_WAIT_UNTIL([as hv1 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.1
>   OVS_WAIT_UNTIL([as hv2 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.20" | grep -q "n_packets=1"])
>   
>   # Set the MAC binding aging threshold
> -AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=1])
> -AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=1])
> +AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=5])
> +AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=5])
>   AT_CHECK([ovn-nbctl --wait=sb sync])
>   
> +# Wait send few packets for "192.168.10.20" to indicate that it is still in use
> +send_udp hv2 ext2 20
> +sleep 1
> +send_udp hv2 ext2 20
> +
>   # Set the timeout for OVS_WAIT* functions to 5 seconds
>   OVS_CTL_TIMEOUT=5
> +OVS_WAIT_UNTIL([
> +    test "$timestamp" != "$(fetch_column mac_binding timestamp ip='192.168.10.20')"
> +])
> +check $(test "$(fetch_column mac_binding timestamp ip='192.168.10.20')" != "")
> +
>   # Check if the records are removed after some inactivity
>   OVS_WAIT_UNTIL([
>       test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.10')"
>   ])
> +# The second one takes longer because it got refreshed
>   OVS_WAIT_UNTIL([
>       test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.20')"
>   ])
> @@ -35257,7 +35270,7 @@ check ovs-vsctl add-br br-phys
>   ovn_attach n1 br-phys 192.168.0.1
>   
>   dnl Ensure that there are at least 3 openflow connections.

Nit: The comment is already incorrect since it checks for an exact 
number of connections instead of "at least" some number. But it also 
needs to be updated to say 4 instead of 3.

> -OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "3"])
> +OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "4"])
>   
>   dnl "Wait" 3 times 60 seconds and ensure ovn-controller writes to the
>   dnl openflow connections in the meantime.  This should allow ovs-vswitchd
Ales Musil July 28, 2023, 6:17 a.m. UTC | #2
On Thu, Jul 27, 2023 at 9:17 PM Mark Michelson <mmichels@redhat.com> wrote:

> Hi Ales, I had a look through this patch finally :)
>
>  From a high-level, I appreciate the design. Periodically requesting
> flow stats in a background thread makes good sense.
>
> I have a couple of suggestions to make to the design.
>

Hi Mark,

thank you for the review.


>
> On 7/10/23 07:05, Ales Musil wrote:
> > To achieve that add thread that will handle
> > statistics requests and delegate the processing
> > to defined functions. This allows the thread to
> > be flexible enough, so it could be extended in future
> > if needed.
> >
> > At the same time connected the thread with the MAC
> > cache I-P node to have the timestamp updates. The
> > updates should happen once per dump_period
> > (3/4 of the aging threshold) per chassis only if
> > the MAC binding is actively used.
> >
> > Signed-off-by: Ales Musil <amusil@redhat.com>
> > ---
> >   controller/automake.mk      |   4 +-
> >   controller/mac_cache.c      |  84 +++++++
> >   controller/mac_cache.h      |   7 +
> >   controller/ovn-controller.c |  11 +
> >   controller/statctrl.c       | 434 ++++++++++++++++++++++++++++++++++++
> >   controller/statctrl.h       |  28 +++
> >   tests/ovn.at                |  23 +-
> >   7 files changed, 585 insertions(+), 6 deletions(-)
> >   create mode 100644 controller/statctrl.c
> >   create mode 100644 controller/statctrl.h
> >
> > diff --git a/controller/automake.mk b/controller/automake.mk
> > index 562290359..0dbbd5d26 100644
> > --- a/controller/automake.mk
> > +++ b/controller/automake.mk
> > @@ -45,7 +45,9 @@ controller_ovn_controller_SOURCES = \
> >       controller/mirror.h \
> >       controller/mirror.c \
> >       controller/mac_cache.h \
> > -     controller/mac_cache.c
> > +     controller/mac_cache.c \
> > +     controller/statctrl.h \
> > +     controller/statctrl.c
> >
> >   controller_ovn_controller_LDADD = lib/libovn.la $(OVS_LIBDIR)/
> libopenvswitch.la
> >   man_MANS += controller/ovn-controller.8
> > diff --git a/controller/mac_cache.c b/controller/mac_cache.c
> > index 4663499a1..6f1d661d4 100644
> > --- a/controller/mac_cache.c
> > +++ b/controller/mac_cache.c
> > @@ -252,3 +252,87 @@ mac_cache_threshold_remove(struct hmap *thresholds,
> >       hmap_remove(thresholds, &threshold->hmap_node);
> >       free(threshold);
> >   }
> > +
> > +struct mac_cache_mb_stats {
> > +    struct ovs_list list_node;
> > +
> > +    int64_t idle_age_ms;
> > +    uint32_t cookie;
> > +    /* Common data to identify MAC binding. */
> > +    struct mac_cache_mb_data data;
> > +};
> > +
> > +void
> > +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> > +                                      struct ofputil_flow_stats
> *ofp_stats)
> > +{
> > +    struct mac_cache_mb_stats *stats = xmalloc(sizeof *stats);
> > +
> > +    stats->idle_age_ms = ofp_stats->idle_age * 1000;
> > +    stats->cookie = ntohll(ofp_stats->cookie);
> > +    stats->data.port_key =
> > +            ofp_stats->match.flow.regs[MFF_LOG_INPORT - MFF_REG0];
> > +    stats->data.dp_key = ntohll(ofp_stats->match.flow.metadata);
> > +
> > +    if (ofp_stats->match.flow.dl_type == htons(ETH_TYPE_IP)) {
> > +        stats->data.ip =
> in6_addr_mapped_ipv4(ofp_stats->match.flow.nw_src);
> > +    } else {
> > +        stats->data.ip = ofp_stats->match.flow.ipv6_src;
> > +    }
> > +
> > +    stats->data.mac = ofp_stats->match.flow.dl_src;
> > +
> > +    ovs_list_push_back(stats_list, &stats->list_node);
> > +}
> > +
> > +void
> > +mac_cache_mb_stats_destroy(struct ovs_list *stats_list)
> > +{
> > +    struct mac_cache_mb_stats *stats;
> > +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> > +        free(stats);
> > +    }
> > +}
> > +
> > +void
> > +mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
> > +                       void *data)
> > +{
> > +    struct mac_cache_data *cache_data = data;
> > +    long long timewall_now = time_wall_msec();
> > +
> > +    struct mac_cache_threshold *threshold;
> > +    struct mac_cache_mb_stats *stats;
> > +    struct mac_cache_mac_binding *mc_mb;
> > +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> > +        mc_mb = mac_cache_mac_binding_find_by_mb_data(cache_data,
> > +                                                      &stats->data);
> > +
> > +        if (!mc_mb) {
> > +            free(stats);
> > +            continue;
> > +        }
> > +
> > +        struct uuid *dp_uuid = &mc_mb->sbrec_mb->datapath->header_.uuid;
> > +        threshold = mac_cache_threshold_find(&cache_data->mb_thresholds,
> > +                                             dp_uuid);
> > +
> > +        uint64_t dump_period = (3 * threshold->value) / 4;
>
> The dump period correlates directly with the configured threshold.
> Perhaps the dump period could be stored on the mac_cache_threshold and
> updated whenever the threshold->value is changed. This way, you would
> not have to calculate it twice every time this function is run.
>
> I also suggest offloading the dump_period calculation to a function so
> that it is easy to change if desired.
>

Yeah that makes sense. This way the calculation will in single place and we
don't have
to probably do a separate function for that.


>
> > +        /* If "idle_age" is under threshold it means that the mac
> binding is
> > +         * used on this chassis. Also make sure that we don't update the
> > +         * timestamp more than once during the dump period. */
> > +        if (stats->idle_age_ms < threshold->value &&
> > +            (timewall_now - mc_mb->sbrec_mb->timestamp) >= dump_period)
> {
> > +            sbrec_mac_binding_set_timestamp(mc_mb->sbrec_mb,
> timewall_now);
> > +        }
> > +
> > +        free(stats);
> > +    }
> > +
> > +    uint64_t dump_period = UINT64_MAX;
> > +    HMAP_FOR_EACH (threshold, hmap_node, &cache_data->mb_thresholds) {
> > +        dump_period = MIN(dump_period, (3 * threshold->value) / 4);
> > +    }
> > +
> > +    *req_delay = dump_period < UINT64_MAX ? dump_period : 0;
> > +}
> > diff --git a/controller/mac_cache.h b/controller/mac_cache.h
> > index f1f1772c8..a29713908 100644
> > --- a/controller/mac_cache.h
> > +++ b/controller/mac_cache.h
> > @@ -71,4 +71,11 @@ void mac_cache_mac_binding_remove(struct
> mac_cache_data *data,
> >   void mac_cache_mac_bindings_destroy(struct mac_cache_data *data);
> >   bool mac_cache_sb_mac_binding_updated(const struct sbrec_mac_binding
> *mb);
> >
> > +void
> > +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> > +                                      struct ofputil_flow_stats
> *ofp_stats);
> > +void mac_cache_mb_stats_destroy(struct ovs_list *stats_list);
> > +void mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t
> *req_delay,
> > +                            void *data);
> > +
> >   #endif /* controller/mac_cache.h */
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index abb18647a..bdf7368b8 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -84,6 +84,7 @@
> >   #include "hmapx.h"
> >   #include "mirror.h"
> >   #include "mac_cache.h"
> > +#include "statctrl.h"
> >
> >   VLOG_DEFINE_THIS_MODULE(main);
> >
> > @@ -4804,6 +4805,7 @@ main(int argc, char *argv[])
> >       lflow_init();
> >       mirror_init();
> >       vif_plug_provider_initialize();
> > +    statctrl_init();
> >
> >       /* Connect to OVS OVSDB instance. */
> >       struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> > @@ -5205,6 +5207,8 @@ main(int argc, char *argv[])
> >           engine_get_internal_data(&en_template_vars);
> >       struct ed_type_lb_data *lb_data =
> >           engine_get_internal_data(&en_lb_data);
> > +    struct mac_cache_data *mac_cache_data =
> > +            engine_get_internal_data(&en_mac_cache);
> >
> >       ofctrl_init(&lflow_output_data->group_table,
> >                   &lflow_output_data->meter_table,
> > @@ -5593,6 +5597,11 @@ main(int argc, char *argv[])
> >                           }
> >                       }
> >
> > +                    if (mac_cache_data) {
> > +                        statctrl_update(br_int->name);
> > +                        statctrl_run(ovnsb_idl_txn, mac_cache_data);
> > +                    }
> > +
> >                       ofctrl_seqno_update_create(
> >                           ofctrl_seq_type_nb_cfg,
> >                           get_nb_cfg(sbrec_sb_global_table_get(
> > @@ -5702,6 +5711,7 @@ main(int argc, char *argv[])
> >               if (br_int) {
> >                   ofctrl_wait();
> >                   pinctrl_wait(ovnsb_idl_txn);
> > +                statctrl_wait(ovnsb_idl_txn);
> >               }
> >
> >               binding_wait();
> > @@ -5836,6 +5846,7 @@ loop_done:
> >       patch_destroy();
> >       mirror_destroy();
> >       encaps_destroy();
> > +    statctrl_destroy();
> >       if_status_mgr_destroy(if_mgr);
> >       shash_destroy(&vif_plug_deleted_iface_ids);
> >       shash_destroy(&vif_plug_changed_iface_ids);
> > diff --git a/controller/statctrl.c b/controller/statctrl.c
> > new file mode 100644
> > index 000000000..9bef827fc
> > --- /dev/null
> > +++ b/controller/statctrl.c
> > @@ -0,0 +1,434 @@
> > +/* Copyright (c) 2023, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +
> > +#include "byte-order.h"
> > +#include "dirs.h"
> > +#include "latch.h"
> > +#include "lflow.h"
> > +#include "mac_cache.h"
> > +#include "openvswitch/ofp-errors.h"
> > +#include "openvswitch/ofp-flow.h"
> > +#include "openvswitch/ofp-msgs.h"
> > +#include "openvswitch/ofp-print.h"
> > +#include "openvswitch/ofp-util.h"
> > +#include "openvswitch/poll-loop.h"
> > +#include "openvswitch/rconn.h"
> > +#include "openvswitch/vlog.h"
> > +#include "ovn/logical-fields.h"
> > +#include "ovs-thread.h"
> > +#include "seq.h"
> > +#include "socket-util.h"
> > +#include "statctrl.h"
> > +
> > +VLOG_DEFINE_THIS_MODULE(statctrl);
> > +
> > +enum stat_type {
> > +    STATS_MAC_BINDING = 0,
> > +    STATS_MAX,
> > +};
> > +
> > +struct stats_node {
> > +    /* Table ID for the statistics request. */
> > +    uint8_t table_id;
>
> Instead of using a table_id here, would it make sense to store a struct
> ofputil_flow_stats_request instead? This way, if stats nodes care about
> group IDs or cookies, they can store them here.
>

That is a good idea, thanks.


>
> > +    /* xid of the last statistics request. */
> > +    ovs_be32 xid;
> > +    /* Timestamp when the next request should happen. */
> > +    int64_t next_request_timestamp;
> > +    /* Request delay in ms. */
> > +    uint64_t request_delay;
> > +    /* List of processed statistics. */
> > +    struct ovs_list stats_list;
> > +    /* Function to clean up the node.
> > +     * This function runs in main thread. */
> > +    void (*destroy)(struct ovs_list *stats_list);
> > +    /* Function to process the response and store it in the list.
> > +     * This function runs in statctrl thread locked behind mutex. */
> > +    void (*process_flow_stats)(struct ovs_list *stats_list,
> > +                               struct ofputil_flow_stats *ofp_stats);
> > +    /* Function to process the parsed stats.
> > +     * This function runs in main thread locked behind mutex. */
> > +    void (*run)(struct ovs_list *stats_list, uint64_t *req_delay, void
> *data);
> > +};
> > +
> > +#define STATS_NODE(NAME, TABLE_ID, DESTROY, PROCESS, RUN)
>     \
> > +    statctrl_ctx.nodes[STATS_##NAME] = (struct stats_node) {
>    \
> > +        .table_id = TABLE_ID,
>     \
> > +        .xid = 0,
>     \
> > +        .next_request_timestamp = INT64_MAX,
>    \
> > +        .request_delay = 0,
>     \
> > +        .stats_list =
>     \
> > +            OVS_LIST_INITIALIZER(
>     \
> > +                &statctrl_ctx.nodes[STATS_##NAME].stats_list),
>    \
> > +        .destroy = DESTROY,
>     \
> > +        .process_flow_stats = PROCESS,
>    \
> > +        .run = RUN
>    \
> > +    };
> > +
> > +struct statctrl_ctx {
> > +    char *br_int;
> > +
> > +    pthread_t thread;
> > +    struct latch exit_latch;
> > +
> > +    struct seq *thread_seq;
> > +    struct seq *main_seq;
> > +
> > +    struct stats_node nodes[STATS_MAX];
> > +};
> > +
> > +static struct statctrl_ctx statctrl_ctx;
> > +static struct ovs_mutex mutex;
> > +
> > +static void *statctrl_thread_handler(void *arg);
> > +static void statctrl_rconn_setup(struct rconn *swconn, char
> *conn_target)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_handle_rconn_msg(struct rconn *swconn,
> > +                                      struct statctrl_ctx *ctx,
> > +                                      struct ofpbuf *msg);
> > +static enum stat_type statctrl_get_stat_type(struct statctrl_ctx *ctx,
> > +                                             const struct ofp_header
> *oh);
> > +static void statctrl_decode_statistics_reply(struct stats_node *node,
> > +                                             struct ofpbuf *msg)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_send_request(struct rconn *swconn,
> > +                                  struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_notify_main_thread(struct statctrl_ctx *ctx);
> > +static void statctrl_set_conn_target(const char *br_int_name)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_wait_next_request(struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex);
> > +static bool statctrl_update_next_request_timestamp(struct stats_node
> *node,
> > +                                                   long long now,
> > +                                                   uint64_t prev_delay)
> > +    OVS_REQUIRES(mutex);
> > +
> > +void
> > +statctrl_init(void)
> > +{
> > +    statctrl_ctx.br_int = NULL;
> > +    latch_init(&statctrl_ctx.exit_latch);
> > +    ovs_mutex_init(&mutex);
> > +    statctrl_ctx.thread_seq = seq_create();
> > +    statctrl_ctx.main_seq = seq_create();
> > +
> > +    /* Definition of all stat nodes. */
> > +    STATS_NODE(MAC_BINDING, OFTABLE_MAC_CACHE_USE,
> mac_cache_mb_stats_destroy,
> > +               mac_cache_mb_stats_process_flow_stats,
> mac_cache_mb_stats_run);
> > +
> > +
> > +    statctrl_ctx.thread = ovs_thread_create("ovn_statctrl",
> > +                                            statctrl_thread_handler,
> > +                                            &statctrl_ctx);
> > +}
> > +
> > +void
> > +statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> > +             struct mac_cache_data *mac_cache_data)
> > +{
> > +    if (!ovnsb_idl_txn) {
> > +        return;
> > +    }
> > +
> > +    void *node_data[STATS_MAX] = {mac_cache_data};
>
> I was thinking about what happens when more than just the MAC cache
> needs to collect flow statistics. Based on this initial design, my
> assumption is that the function signature for statctrl_run() would grow
> as each new stats node type is defined:
>
> void
> statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
>               struct mac_cache_data *mac_cache_data,
>               struct foo_data *foo_data,
>               struct bar_data *bar_data)
>
> Is that the intention? I suppose this will be OK as long as the number
> of stats nodes stays low. But if we suspect that we will end up with a
> lot more, then we may want to switch to passing in, say, a shash of data
> instead of each individual stats node data type. My biggest concern here
> is merge conflicts when trying to backport patches.
>
> Or did you have a different idea in mind for how new stats data types
> would be handled?
>

Yeah that is the original intention as I don't anticipate that it will grow
too much.
So for example initially there will be only one argument as the
mac_cache_data
is planned to be used for both MAC binding and FDB.


>
> > +
> > +    bool schedule_updated = false;
> > +    long long now = time_msec();
> > +
> > +    ovs_mutex_lock(&mutex);
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        uint64_t prev_delay = node->request_delay;
> > +
> > +        node->run(&node->stats_list, &node->request_delay,
> node_data[i]);
> > +
> > +        schedule_updated |=
> > +                statctrl_update_next_request_timestamp(node, now,
> prev_delay);
> > +    }
> > +    ovs_mutex_unlock(&mutex);
> > +
> > +    if (schedule_updated) {
> > +        seq_change(statctrl_ctx.thread_seq);
> > +    }
> > +}
> > +
> > +void
> > +statctrl_update(const char *br_int_name)
> > +{
> > +    ovs_mutex_lock(&mutex);
> > +    statctrl_set_conn_target(br_int_name);
> > +    ovs_mutex_unlock(&mutex);
> > +}
> > +
> > +void
> > +statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
> > +{
> > +    if (!ovnsb_idl_txn) {
> > +        return;
> > +    }
> > +
> > +    ovs_mutex_lock(&mutex);
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        if (!ovs_list_is_empty(&node->stats_list)) {
> > +            poll_immediate_wake();
> > +        }
> > +    }
> > +    int64_t new_seq = seq_read(statctrl_ctx.main_seq);
> > +    seq_wait(statctrl_ctx.main_seq, new_seq);
> > +    ovs_mutex_unlock(&mutex);
> > +}
> > +
> > +void
> > +statctrl_destroy(void)
> > +{
> > +    latch_set(&statctrl_ctx.exit_latch);
> > +    pthread_join(statctrl_ctx.thread, NULL);
> > +    latch_destroy(&statctrl_ctx.exit_latch);
> > +    free(statctrl_ctx.br_int);
> > +    seq_destroy(statctrl_ctx.thread_seq);
> > +    seq_destroy(statctrl_ctx.main_seq);
> > +
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        node->destroy(&node->stats_list);
> > +    }
> > +}
> > +
> > +static void *
> > +statctrl_thread_handler(void *arg)
> > +{
> > +    struct statctrl_ctx *ctx = arg;
> > +
> > +    /* OpenFlow connection to the switch. */
> > +    struct rconn *swconn = rconn_create(5, 0, DSCP_DEFAULT,
> > +                                        1 << OFP15_VERSION);
> > +
> > +    while (!latch_is_set(&ctx->exit_latch)) {
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_rconn_setup(swconn, ctx->br_int);
> > +        ovs_mutex_unlock(&mutex);
> > +
> > +        rconn_run(swconn);
> > +        uint64_t new_seq = seq_read(ctx->thread_seq);
> > +
> > +        if (rconn_is_connected(swconn)) {
> > +            for (int i = 0; i < 100; i++) {
> > +                struct ofpbuf *msg = rconn_recv(swconn);
> > +
> > +                if (!msg) {
> > +                    break;
> > +                }
> > +
> > +                statctrl_handle_rconn_msg(swconn, ctx, msg);
> > +                ofpbuf_delete(msg);
> > +            }
> > +
> > +            ovs_mutex_lock(&mutex);
> > +            statctrl_send_request(swconn, ctx);
> > +            ovs_mutex_unlock(&mutex);
> > +        }
> > +
> > +        statctrl_notify_main_thread(ctx);
> > +        rconn_run_wait(swconn);
> > +        rconn_recv_wait(swconn);
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_wait_next_request(ctx);
> > +        ovs_mutex_unlock(&mutex);
> > +        seq_wait(ctx->thread_seq, new_seq);
> > +        latch_wait(&ctx->exit_latch);
> > +
> > +        poll_block();
> > +    }
> > +
> > +    rconn_destroy(swconn);
> > +    return NULL;
> > +}
> > +
> > +static void
> > +statctrl_rconn_setup(struct rconn *swconn, char *br_int)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    if (!br_int) {
> > +        rconn_disconnect(swconn);
> > +        return;
> > +    }
> > +
> > +    char *conn_target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(),
> br_int);
> > +
> > +    if (strcmp(conn_target, rconn_get_target(swconn))) {
> > +        VLOG_INFO("%s: connecting to switch", conn_target);
> > +        rconn_connect(swconn, conn_target, conn_target);
> > +    }
> > +
> > +    free(conn_target);
> > +}
> > +
> > +static void
> > +statctrl_handle_rconn_msg(struct rconn *swconn, struct statctrl_ctx
> *ctx,
> > +                          struct ofpbuf *msg)
> > +{
> > +    enum ofptype type;
> > +    const struct ofp_header *oh = msg->data;
> > +
> > +    ofptype_decode(&type, oh);
> > +
> > +    if (type == OFPTYPE_ECHO_REQUEST) {
> > +        rconn_send(swconn, ofputil_encode_echo_reply(oh), NULL);
> > +    } else if (type == OFPTYPE_FLOW_STATS_REPLY) {
> > +        enum stat_type stype = statctrl_get_stat_type(ctx, oh);
> > +        if (stype == STATS_MAX) {
> > +            return;
> > +        }
> > +
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_decode_statistics_reply(&ctx->nodes[stype], msg);
> > +        ovs_mutex_unlock(&mutex);
> > +    } else {
> > +        if (VLOG_IS_DBG_ENABLED()) {
> > +
> > +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30,
> 300);
> > +
> > +            char *s = ofp_to_string(oh, ntohs(oh->length), NULL, NULL,
> 2);
> > +
> > +            VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
> > +            free(s);
> > +        }
> > +    }
> > +}
> > +
> > +static enum stat_type
> > +statctrl_get_stat_type(struct statctrl_ctx *ctx, const struct
> ofp_header *oh)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        if (ctx->nodes[i].xid == oh->xid) {
> > +            return i;
> > +        }
> > +    }
> > +    return STATS_MAX;
> > +}
> > +
> > +static void
> > +statctrl_decode_statistics_reply(struct stats_node *node, struct ofpbuf
> *msg)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    struct ofpbuf ofpacts;
> > +    ofpbuf_init(&ofpacts, 0);
> > +
> > +    while (true) {
> > +        struct ofputil_flow_stats fs;
> > +
> > +        int error = ofputil_decode_flow_stats_reply(&fs, msg, true,
> &ofpacts);
> > +        if (error == EOF) {
> > +            break;
> > +        } else if (error) {
> > +            VLOG_DBG("Couldn't parse stat reply: %s",
> ofperr_to_string(error));
> > +            break;
> > +        }
> > +
> > +        node->process_flow_stats(&node->stats_list, &fs);
> > +    }
> > +
> > +    ofpbuf_uninit(&ofpacts);
> > +}
> > +
> > +static void
> > +statctrl_send_request(struct rconn *swconn, struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    long long now = time_msec();
> > +    enum ofp_version version = rconn_get_version(swconn);
> > +    enum ofputil_protocol proto =
> ofputil_protocol_from_ofp_version(version);
> > +
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &ctx->nodes[i];
> > +
> > +        if (now < node->next_request_timestamp) {
> > +            continue;
> > +        }
> > +
> > +        struct ofputil_flow_stats_request fsr = {
> > +                .cookie = htonll(0),
> > +                .cookie_mask = htonll(0),
> > +                .out_port = OFPP_ANY,
> > +                .out_group = OFPG_ANY,
> > +                .table_id = node->table_id,
> > +        }; > +        struct ofpbuf *msg =
> ofputil_encode_flow_stats_request(&fsr,
> proto);
> > +        node->xid = ((struct ofp_header *) msg->data)->xid;
> > +
> > +        statctrl_update_next_request_timestamp(node, now, 0);
> > +
> > +        rconn_send(swconn, msg, NULL);
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_notify_main_thread(struct statctrl_ctx *ctx)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        if (!ovs_list_is_empty(&ctx->nodes[i].stats_list)) {
> > +            seq_change(ctx->main_seq);
> > +            return;
> > +        }
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_set_conn_target(const char *br_int_name)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    if (!br_int_name) {
> > +        return;
> > +    }
> > +
> > +
> > +    if (!statctrl_ctx.br_int || strcmp(statctrl_ctx.br_int,
> br_int_name)) {
> > +        free(statctrl_ctx.br_int);
> > +        statctrl_ctx.br_int = xstrdup(br_int_name);
> > +        /* Notify statctrl thread that integration bridge is
> set/changed. */
> > +        seq_change(statctrl_ctx.thread_seq);
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_wait_next_request(struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        int64_t timestamp = ctx->nodes[i].next_request_timestamp;
> > +        if (timestamp < INT64_MAX) {
> > +            poll_timer_wait_until(timestamp);
> > +        }
> > +    }
> > +}
> > +
> > +static bool
> > +statctrl_update_next_request_timestamp(struct stats_node *node,
> > +                                       long long now, uint64_t
> prev_delay)
> > +{
> > +    if (!node->request_delay) {
> > +        node->next_request_timestamp = INT64_MAX;
> > +        return false;
> > +    }
> > +
> > +    int64_t timestamp = prev_delay ? node->next_request_timestamp : now;
> > +    node->next_request_timestamp =
> > +            timestamp + node->request_delay - prev_delay;
> > +
> > +    return timestamp != node->next_request_timestamp;
> > +}
> > diff --git a/controller/statctrl.h b/controller/statctrl.h
> > new file mode 100644
> > index 000000000..c5cede353
> > --- /dev/null
> > +++ b/controller/statctrl.h
> > @@ -0,0 +1,28 @@
> > +/* Copyright (c) 2023, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#ifndef STATCTRL_H
> > +#define STATCTRL_H
> > +
> > +#include "mac_cache.h"
> > +
> > +void statctrl_init(void);
> > +void statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> > +                  struct mac_cache_data *mac_cache_data);
> > +void statctrl_update(const char *br_int_name);
> > +void statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn);
> > +void statctrl_destroy(void);
> > +
> > +#endif /* controller/statctrl.h */
> > diff --git a/tests/ovn.at b/tests/ovn.at
> > index 7cee8a175..f4c28c57b 100644
> > --- a/tests/ovn.at
> > +++ b/tests/ovn.at
> > @@ -34556,8 +34556,10 @@ AT_CHECK([fetch_column nb:logical_router
> options name="gw" | grep -q mac_binding
> >   send_garp hv1 ext1 10
> >   send_garp hv2 ext2 20
> >
> > -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.10"])
> > -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.20"])
> > +wait_row_count mac_binding 1 ip="192.168.10.10"
> > +wait_row_count mac_binding 1 ip="192.168.10.20"
> > +
> > +timestamp=$(fetch_column mac_binding timestamp ip="192.168.10.20")
> >
> >   send_udp hv1 ext1 10
> >   send_udp hv2 ext2 20
> > @@ -34566,16 +34568,27 @@ OVS_WAIT_UNTIL([as hv1 ovs-ofctl dump-flows
> br-int table=79 | grep "192.168.10.1
> >   OVS_WAIT_UNTIL([as hv2 ovs-ofctl dump-flows br-int table=79 | grep
> "192.168.10.20" | grep -q "n_packets=1"])
> >
> >   # Set the MAC binding aging threshold
> > -AT_CHECK([ovn-nbctl set logical_router gw
> options:mac_binding_age_threshold=1])
> > -AT_CHECK([fetch_column nb:logical_router options | grep -q
> mac_binding_age_threshold=1])
> > +AT_CHECK([ovn-nbctl set logical_router gw
> options:mac_binding_age_threshold=5])
> > +AT_CHECK([fetch_column nb:logical_router options | grep -q
> mac_binding_age_threshold=5])
> >   AT_CHECK([ovn-nbctl --wait=sb sync])
> >
> > +# Wait send few packets for "192.168.10.20" to indicate that it is
> still in use
> > +send_udp hv2 ext2 20
> > +sleep 1
> > +send_udp hv2 ext2 20
> > +
> >   # Set the timeout for OVS_WAIT* functions to 5 seconds
> >   OVS_CTL_TIMEOUT=5
> > +OVS_WAIT_UNTIL([
> > +    test "$timestamp" != "$(fetch_column mac_binding timestamp
> ip='192.168.10.20')"
> > +])
> > +check $(test "$(fetch_column mac_binding timestamp ip='192.168.10.20')"
> != "")
> > +
> >   # Check if the records are removed after some inactivity
> >   OVS_WAIT_UNTIL([
> >       test "0" = "$(ovn-sbctl list mac_binding | grep -c
> '192.168.10.10')"
> >   ])
> > +# The second one takes longer because it got refreshed
> >   OVS_WAIT_UNTIL([
> >       test "0" = "$(ovn-sbctl list mac_binding | grep -c
> '192.168.10.20')"
> >   ])
> > @@ -35257,7 +35270,7 @@ check ovs-vsctl add-br br-phys
> >   ovn_attach n1 br-phys 192.168.0.1
> >
> >   dnl Ensure that there are at least 3 openflow connections.
>
> Nit: The comment is already incorrect since it checks for an exact
> number of connections instead of "at least" some number. But it also
> needs to be updated to say 4 instead of 3.
>
>
Ah right, it will be fixed in v2.


>
> > -OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version'
> hv1/ovs-vswitchd.log)" -eq "3"])
> > +OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version'
> hv1/ovs-vswitchd.log)" -eq "4"])
> >
> >   dnl "Wait" 3 times 60 seconds and ensure ovn-controller writes to the
> >   dnl openflow connections in the meantime.  This should allow
> ovs-vswitchd
>
>
Thanks,
Ales
diff mbox series

Patch

diff --git a/controller/automake.mk b/controller/automake.mk
index 562290359..0dbbd5d26 100644
--- a/controller/automake.mk
+++ b/controller/automake.mk
@@ -45,7 +45,9 @@  controller_ovn_controller_SOURCES = \
 	controller/mirror.h \
 	controller/mirror.c \
 	controller/mac_cache.h \
-	controller/mac_cache.c
+	controller/mac_cache.c \
+	controller/statctrl.h \
+	controller/statctrl.c
 
 controller_ovn_controller_LDADD = lib/libovn.la $(OVS_LIBDIR)/libopenvswitch.la
 man_MANS += controller/ovn-controller.8
diff --git a/controller/mac_cache.c b/controller/mac_cache.c
index 4663499a1..6f1d661d4 100644
--- a/controller/mac_cache.c
+++ b/controller/mac_cache.c
@@ -252,3 +252,87 @@  mac_cache_threshold_remove(struct hmap *thresholds,
     hmap_remove(thresholds, &threshold->hmap_node);
     free(threshold);
 }
+
+struct mac_cache_mb_stats {
+    struct ovs_list list_node;
+
+    int64_t idle_age_ms;
+    uint32_t cookie;
+    /* Common data to identify MAC binding. */
+    struct mac_cache_mb_data data;
+};
+
+void
+mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
+                                      struct ofputil_flow_stats *ofp_stats)
+{
+    struct mac_cache_mb_stats *stats = xmalloc(sizeof *stats);
+
+    stats->idle_age_ms = ofp_stats->idle_age * 1000;
+    stats->cookie = ntohll(ofp_stats->cookie);
+    stats->data.port_key =
+            ofp_stats->match.flow.regs[MFF_LOG_INPORT - MFF_REG0];
+    stats->data.dp_key = ntohll(ofp_stats->match.flow.metadata);
+
+    if (ofp_stats->match.flow.dl_type == htons(ETH_TYPE_IP)) {
+        stats->data.ip = in6_addr_mapped_ipv4(ofp_stats->match.flow.nw_src);
+    } else {
+        stats->data.ip = ofp_stats->match.flow.ipv6_src;
+    }
+
+    stats->data.mac = ofp_stats->match.flow.dl_src;
+
+    ovs_list_push_back(stats_list, &stats->list_node);
+}
+
+void
+mac_cache_mb_stats_destroy(struct ovs_list *stats_list)
+{
+    struct mac_cache_mb_stats *stats;
+    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
+        free(stats);
+    }
+}
+
+void
+mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
+                       void *data)
+{
+    struct mac_cache_data *cache_data = data;
+    long long timewall_now = time_wall_msec();
+
+    struct mac_cache_threshold *threshold;
+    struct mac_cache_mb_stats *stats;
+    struct mac_cache_mac_binding *mc_mb;
+    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
+        mc_mb = mac_cache_mac_binding_find_by_mb_data(cache_data,
+                                                      &stats->data);
+
+        if (!mc_mb) {
+            free(stats);
+            continue;
+        }
+
+        struct uuid *dp_uuid = &mc_mb->sbrec_mb->datapath->header_.uuid;
+        threshold = mac_cache_threshold_find(&cache_data->mb_thresholds,
+                                             dp_uuid);
+
+        uint64_t dump_period = (3 * threshold->value) / 4;
+        /* If "idle_age" is under threshold it means that the mac binding is
+         * used on this chassis. Also make sure that we don't update the
+         * timestamp more than once during the dump period. */
+        if (stats->idle_age_ms < threshold->value &&
+            (timewall_now - mc_mb->sbrec_mb->timestamp) >= dump_period) {
+            sbrec_mac_binding_set_timestamp(mc_mb->sbrec_mb, timewall_now);
+        }
+
+        free(stats);
+    }
+
+    uint64_t dump_period = UINT64_MAX;
+    HMAP_FOR_EACH (threshold, hmap_node, &cache_data->mb_thresholds) {
+        dump_period = MIN(dump_period, (3 * threshold->value) / 4);
+    }
+
+    *req_delay = dump_period < UINT64_MAX ? dump_period : 0;
+}
diff --git a/controller/mac_cache.h b/controller/mac_cache.h
index f1f1772c8..a29713908 100644
--- a/controller/mac_cache.h
+++ b/controller/mac_cache.h
@@ -71,4 +71,11 @@  void mac_cache_mac_binding_remove(struct mac_cache_data *data,
 void mac_cache_mac_bindings_destroy(struct mac_cache_data *data);
 bool mac_cache_sb_mac_binding_updated(const struct sbrec_mac_binding *mb);
 
+void
+mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
+                                      struct ofputil_flow_stats *ofp_stats);
+void mac_cache_mb_stats_destroy(struct ovs_list *stats_list);
+void mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
+                            void *data);
+
 #endif /* controller/mac_cache.h */
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index abb18647a..bdf7368b8 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -84,6 +84,7 @@ 
 #include "hmapx.h"
 #include "mirror.h"
 #include "mac_cache.h"
+#include "statctrl.h"
 
 VLOG_DEFINE_THIS_MODULE(main);
 
@@ -4804,6 +4805,7 @@  main(int argc, char *argv[])
     lflow_init();
     mirror_init();
     vif_plug_provider_initialize();
+    statctrl_init();
 
     /* Connect to OVS OVSDB instance. */
     struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
@@ -5205,6 +5207,8 @@  main(int argc, char *argv[])
         engine_get_internal_data(&en_template_vars);
     struct ed_type_lb_data *lb_data =
         engine_get_internal_data(&en_lb_data);
+    struct mac_cache_data *mac_cache_data =
+            engine_get_internal_data(&en_mac_cache);
 
     ofctrl_init(&lflow_output_data->group_table,
                 &lflow_output_data->meter_table,
@@ -5593,6 +5597,11 @@  main(int argc, char *argv[])
                         }
                     }
 
+                    if (mac_cache_data) {
+                        statctrl_update(br_int->name);
+                        statctrl_run(ovnsb_idl_txn, mac_cache_data);
+                    }
+
                     ofctrl_seqno_update_create(
                         ofctrl_seq_type_nb_cfg,
                         get_nb_cfg(sbrec_sb_global_table_get(
@@ -5702,6 +5711,7 @@  main(int argc, char *argv[])
             if (br_int) {
                 ofctrl_wait();
                 pinctrl_wait(ovnsb_idl_txn);
+                statctrl_wait(ovnsb_idl_txn);
             }
 
             binding_wait();
@@ -5836,6 +5846,7 @@  loop_done:
     patch_destroy();
     mirror_destroy();
     encaps_destroy();
+    statctrl_destroy();
     if_status_mgr_destroy(if_mgr);
     shash_destroy(&vif_plug_deleted_iface_ids);
     shash_destroy(&vif_plug_changed_iface_ids);
diff --git a/controller/statctrl.c b/controller/statctrl.c
new file mode 100644
index 000000000..9bef827fc
--- /dev/null
+++ b/controller/statctrl.c
@@ -0,0 +1,434 @@ 
+/* Copyright (c) 2023, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "byte-order.h"
+#include "dirs.h"
+#include "latch.h"
+#include "lflow.h"
+#include "mac_cache.h"
+#include "openvswitch/ofp-errors.h"
+#include "openvswitch/ofp-flow.h"
+#include "openvswitch/ofp-msgs.h"
+#include "openvswitch/ofp-print.h"
+#include "openvswitch/ofp-util.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/rconn.h"
+#include "openvswitch/vlog.h"
+#include "ovn/logical-fields.h"
+#include "ovs-thread.h"
+#include "seq.h"
+#include "socket-util.h"
+#include "statctrl.h"
+
+VLOG_DEFINE_THIS_MODULE(statctrl);
+
+enum stat_type {
+    STATS_MAC_BINDING = 0,
+    STATS_MAX,
+};
+
+struct stats_node {
+    /* Table ID for the statistics request. */
+    uint8_t table_id;
+    /* xid of the last statistics request. */
+    ovs_be32 xid;
+    /* Timestamp when the next request should happen. */
+    int64_t next_request_timestamp;
+    /* Request delay in ms. */
+    uint64_t request_delay;
+    /* List of processed statistics. */
+    struct ovs_list stats_list;
+    /* Function to clean up the node.
+     * This function runs in main thread. */
+    void (*destroy)(struct ovs_list *stats_list);
+    /* Function to process the response and store it in the list.
+     * This function runs in statctrl thread locked behind mutex. */
+    void (*process_flow_stats)(struct ovs_list *stats_list,
+                               struct ofputil_flow_stats *ofp_stats);
+    /* Function to process the parsed stats.
+     * This function runs in main thread locked behind mutex. */
+    void (*run)(struct ovs_list *stats_list, uint64_t *req_delay, void *data);
+};
+
+#define STATS_NODE(NAME, TABLE_ID, DESTROY, PROCESS, RUN)                  \
+    statctrl_ctx.nodes[STATS_##NAME] = (struct stats_node) {               \
+        .table_id = TABLE_ID,                                              \
+        .xid = 0,                                                          \
+        .next_request_timestamp = INT64_MAX,                               \
+        .request_delay = 0,                                                \
+        .stats_list =                                                      \
+            OVS_LIST_INITIALIZER(                                          \
+                &statctrl_ctx.nodes[STATS_##NAME].stats_list),             \
+        .destroy = DESTROY,                                                \
+        .process_flow_stats = PROCESS,                                     \
+        .run = RUN                                                         \
+    };
+
+struct statctrl_ctx {
+    char *br_int;
+
+    pthread_t thread;
+    struct latch exit_latch;
+
+    struct seq *thread_seq;
+    struct seq *main_seq;
+
+    struct stats_node nodes[STATS_MAX];
+};
+
+static struct statctrl_ctx statctrl_ctx;
+static struct ovs_mutex mutex;
+
+static void *statctrl_thread_handler(void *arg);
+static void statctrl_rconn_setup(struct rconn *swconn, char *conn_target)
+    OVS_REQUIRES(mutex);
+static void statctrl_handle_rconn_msg(struct rconn *swconn,
+                                      struct statctrl_ctx *ctx,
+                                      struct ofpbuf *msg);
+static enum stat_type statctrl_get_stat_type(struct statctrl_ctx *ctx,
+                                             const struct ofp_header *oh);
+static void statctrl_decode_statistics_reply(struct stats_node *node,
+                                             struct ofpbuf *msg)
+    OVS_REQUIRES(mutex);
+static void statctrl_send_request(struct rconn *swconn,
+                                  struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex);
+static void statctrl_notify_main_thread(struct statctrl_ctx *ctx);
+static void statctrl_set_conn_target(const char *br_int_name)
+    OVS_REQUIRES(mutex);
+static void statctrl_wait_next_request(struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex);
+static bool statctrl_update_next_request_timestamp(struct stats_node *node,
+                                                   long long now,
+                                                   uint64_t prev_delay)
+    OVS_REQUIRES(mutex);
+
+void
+statctrl_init(void)
+{
+    statctrl_ctx.br_int = NULL;
+    latch_init(&statctrl_ctx.exit_latch);
+    ovs_mutex_init(&mutex);
+    statctrl_ctx.thread_seq = seq_create();
+    statctrl_ctx.main_seq = seq_create();
+
+    /* Definition of all stat nodes. */
+    STATS_NODE(MAC_BINDING, OFTABLE_MAC_CACHE_USE, mac_cache_mb_stats_destroy,
+               mac_cache_mb_stats_process_flow_stats, mac_cache_mb_stats_run);
+
+
+    statctrl_ctx.thread = ovs_thread_create("ovn_statctrl",
+                                            statctrl_thread_handler,
+                                            &statctrl_ctx);
+}
+
+void
+statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
+             struct mac_cache_data *mac_cache_data)
+{
+    if (!ovnsb_idl_txn) {
+        return;
+    }
+
+    void *node_data[STATS_MAX] = {mac_cache_data};
+
+    bool schedule_updated = false;
+    long long now = time_msec();
+
+    ovs_mutex_lock(&mutex);
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        uint64_t prev_delay = node->request_delay;
+
+        node->run(&node->stats_list, &node->request_delay, node_data[i]);
+
+        schedule_updated |=
+                statctrl_update_next_request_timestamp(node, now, prev_delay);
+    }
+    ovs_mutex_unlock(&mutex);
+
+    if (schedule_updated) {
+        seq_change(statctrl_ctx.thread_seq);
+    }
+}
+
+void
+statctrl_update(const char *br_int_name)
+{
+    ovs_mutex_lock(&mutex);
+    statctrl_set_conn_target(br_int_name);
+    ovs_mutex_unlock(&mutex);
+}
+
+void
+statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
+{
+    if (!ovnsb_idl_txn) {
+        return;
+    }
+
+    ovs_mutex_lock(&mutex);
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        if (!ovs_list_is_empty(&node->stats_list)) {
+            poll_immediate_wake();
+        }
+    }
+    int64_t new_seq = seq_read(statctrl_ctx.main_seq);
+    seq_wait(statctrl_ctx.main_seq, new_seq);
+    ovs_mutex_unlock(&mutex);
+}
+
+void
+statctrl_destroy(void)
+{
+    latch_set(&statctrl_ctx.exit_latch);
+    pthread_join(statctrl_ctx.thread, NULL);
+    latch_destroy(&statctrl_ctx.exit_latch);
+    free(statctrl_ctx.br_int);
+    seq_destroy(statctrl_ctx.thread_seq);
+    seq_destroy(statctrl_ctx.main_seq);
+
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &statctrl_ctx.nodes[i];
+        node->destroy(&node->stats_list);
+    }
+}
+
+static void *
+statctrl_thread_handler(void *arg)
+{
+    struct statctrl_ctx *ctx = arg;
+
+    /* OpenFlow connection to the switch. */
+    struct rconn *swconn = rconn_create(5, 0, DSCP_DEFAULT,
+                                        1 << OFP15_VERSION);
+
+    while (!latch_is_set(&ctx->exit_latch)) {
+        ovs_mutex_lock(&mutex);
+        statctrl_rconn_setup(swconn, ctx->br_int);
+        ovs_mutex_unlock(&mutex);
+
+        rconn_run(swconn);
+        uint64_t new_seq = seq_read(ctx->thread_seq);
+
+        if (rconn_is_connected(swconn)) {
+            for (int i = 0; i < 100; i++) {
+                struct ofpbuf *msg = rconn_recv(swconn);
+
+                if (!msg) {
+                    break;
+                }
+
+                statctrl_handle_rconn_msg(swconn, ctx, msg);
+                ofpbuf_delete(msg);
+            }
+
+            ovs_mutex_lock(&mutex);
+            statctrl_send_request(swconn, ctx);
+            ovs_mutex_unlock(&mutex);
+        }
+
+        statctrl_notify_main_thread(ctx);
+        rconn_run_wait(swconn);
+        rconn_recv_wait(swconn);
+        ovs_mutex_lock(&mutex);
+        statctrl_wait_next_request(ctx);
+        ovs_mutex_unlock(&mutex);
+        seq_wait(ctx->thread_seq, new_seq);
+        latch_wait(&ctx->exit_latch);
+
+        poll_block();
+    }
+
+    rconn_destroy(swconn);
+    return NULL;
+}
+
+static void
+statctrl_rconn_setup(struct rconn *swconn, char *br_int)
+    OVS_REQUIRES(mutex)
+{
+    if (!br_int) {
+        rconn_disconnect(swconn);
+        return;
+    }
+
+    char *conn_target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int);
+
+    if (strcmp(conn_target, rconn_get_target(swconn))) {
+        VLOG_INFO("%s: connecting to switch", conn_target);
+        rconn_connect(swconn, conn_target, conn_target);
+    }
+
+    free(conn_target);
+}
+
+static void
+statctrl_handle_rconn_msg(struct rconn *swconn, struct statctrl_ctx *ctx,
+                          struct ofpbuf *msg)
+{
+    enum ofptype type;
+    const struct ofp_header *oh = msg->data;
+
+    ofptype_decode(&type, oh);
+
+    if (type == OFPTYPE_ECHO_REQUEST) {
+        rconn_send(swconn, ofputil_encode_echo_reply(oh), NULL);
+    } else if (type == OFPTYPE_FLOW_STATS_REPLY) {
+        enum stat_type stype = statctrl_get_stat_type(ctx, oh);
+        if (stype == STATS_MAX) {
+            return;
+        }
+
+        ovs_mutex_lock(&mutex);
+        statctrl_decode_statistics_reply(&ctx->nodes[stype], msg);
+        ovs_mutex_unlock(&mutex);
+    } else {
+        if (VLOG_IS_DBG_ENABLED()) {
+
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
+
+            char *s = ofp_to_string(oh, ntohs(oh->length), NULL, NULL, 2);
+
+            VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
+            free(s);
+        }
+    }
+}
+
+static enum stat_type
+statctrl_get_stat_type(struct statctrl_ctx *ctx, const struct ofp_header *oh)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        if (ctx->nodes[i].xid == oh->xid) {
+            return i;
+        }
+    }
+    return STATS_MAX;
+}
+
+static void
+statctrl_decode_statistics_reply(struct stats_node *node, struct ofpbuf *msg)
+    OVS_REQUIRES(mutex)
+{
+    struct ofpbuf ofpacts;
+    ofpbuf_init(&ofpacts, 0);
+
+    while (true) {
+        struct ofputil_flow_stats fs;
+
+        int error = ofputil_decode_flow_stats_reply(&fs, msg, true, &ofpacts);
+        if (error == EOF) {
+            break;
+        } else if (error) {
+            VLOG_DBG("Couldn't parse stat reply: %s", ofperr_to_string(error));
+            break;
+        }
+
+        node->process_flow_stats(&node->stats_list, &fs);
+    }
+
+    ofpbuf_uninit(&ofpacts);
+}
+
+static void
+statctrl_send_request(struct rconn *swconn, struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex)
+{
+    long long now = time_msec();
+    enum ofp_version version = rconn_get_version(swconn);
+    enum ofputil_protocol proto = ofputil_protocol_from_ofp_version(version);
+
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        struct stats_node *node = &ctx->nodes[i];
+
+        if (now < node->next_request_timestamp) {
+            continue;
+        }
+
+        struct ofputil_flow_stats_request fsr = {
+                .cookie = htonll(0),
+                .cookie_mask = htonll(0),
+                .out_port = OFPP_ANY,
+                .out_group = OFPG_ANY,
+                .table_id = node->table_id,
+        };
+        struct ofpbuf *msg = ofputil_encode_flow_stats_request(&fsr, proto);
+        node->xid = ((struct ofp_header *) msg->data)->xid;
+
+        statctrl_update_next_request_timestamp(node, now, 0);
+
+        rconn_send(swconn, msg, NULL);
+    }
+}
+
+static void
+statctrl_notify_main_thread(struct statctrl_ctx *ctx)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        if (!ovs_list_is_empty(&ctx->nodes[i].stats_list)) {
+            seq_change(ctx->main_seq);
+            return;
+        }
+    }
+}
+
+static void
+statctrl_set_conn_target(const char *br_int_name)
+    OVS_REQUIRES(mutex)
+{
+    if (!br_int_name) {
+        return;
+    }
+
+
+    if (!statctrl_ctx.br_int || strcmp(statctrl_ctx.br_int, br_int_name)) {
+        free(statctrl_ctx.br_int);
+        statctrl_ctx.br_int = xstrdup(br_int_name);
+        /* Notify statctrl thread that integration bridge is set/changed. */
+        seq_change(statctrl_ctx.thread_seq);
+    }
+}
+
+static void
+statctrl_wait_next_request(struct statctrl_ctx *ctx)
+    OVS_REQUIRES(mutex)
+{
+    for (size_t i = 0; i < STATS_MAX; i++) {
+        int64_t timestamp = ctx->nodes[i].next_request_timestamp;
+        if (timestamp < INT64_MAX) {
+            poll_timer_wait_until(timestamp);
+        }
+    }
+}
+
+static bool
+statctrl_update_next_request_timestamp(struct stats_node *node,
+                                       long long now, uint64_t prev_delay)
+{
+    if (!node->request_delay) {
+        node->next_request_timestamp = INT64_MAX;
+        return false;
+    }
+
+    int64_t timestamp = prev_delay ? node->next_request_timestamp : now;
+    node->next_request_timestamp =
+            timestamp + node->request_delay - prev_delay;
+
+    return timestamp != node->next_request_timestamp;
+}
diff --git a/controller/statctrl.h b/controller/statctrl.h
new file mode 100644
index 000000000..c5cede353
--- /dev/null
+++ b/controller/statctrl.h
@@ -0,0 +1,28 @@ 
+/* Copyright (c) 2023, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STATCTRL_H
+#define STATCTRL_H
+
+#include "mac_cache.h"
+
+void statctrl_init(void);
+void statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
+                  struct mac_cache_data *mac_cache_data);
+void statctrl_update(const char *br_int_name);
+void statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn);
+void statctrl_destroy(void);
+
+#endif /* controller/statctrl.h */
diff --git a/tests/ovn.at b/tests/ovn.at
index 7cee8a175..f4c28c57b 100644
--- a/tests/ovn.at
+++ b/tests/ovn.at
@@ -34556,8 +34556,10 @@  AT_CHECK([fetch_column nb:logical_router options name="gw" | grep -q mac_binding
 send_garp hv1 ext1 10
 send_garp hv2 ext2 20
 
-OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.10"])
-OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.20"])
+wait_row_count mac_binding 1 ip="192.168.10.10"
+wait_row_count mac_binding 1 ip="192.168.10.20"
+
+timestamp=$(fetch_column mac_binding timestamp ip="192.168.10.20")
 
 send_udp hv1 ext1 10
 send_udp hv2 ext2 20
@@ -34566,16 +34568,27 @@  OVS_WAIT_UNTIL([as hv1 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.1
 OVS_WAIT_UNTIL([as hv2 ovs-ofctl dump-flows br-int table=79 | grep "192.168.10.20" | grep -q "n_packets=1"])
 
 # Set the MAC binding aging threshold
-AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=1])
-AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=1])
+AT_CHECK([ovn-nbctl set logical_router gw options:mac_binding_age_threshold=5])
+AT_CHECK([fetch_column nb:logical_router options | grep -q mac_binding_age_threshold=5])
 AT_CHECK([ovn-nbctl --wait=sb sync])
 
+# Wait send few packets for "192.168.10.20" to indicate that it is still in use
+send_udp hv2 ext2 20
+sleep 1
+send_udp hv2 ext2 20
+
 # Set the timeout for OVS_WAIT* functions to 5 seconds
 OVS_CTL_TIMEOUT=5
+OVS_WAIT_UNTIL([
+    test "$timestamp" != "$(fetch_column mac_binding timestamp ip='192.168.10.20')"
+])
+check $(test "$(fetch_column mac_binding timestamp ip='192.168.10.20')" != "")
+
 # Check if the records are removed after some inactivity
 OVS_WAIT_UNTIL([
     test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.10')"
 ])
+# The second one takes longer because it got refreshed
 OVS_WAIT_UNTIL([
     test "0" = "$(ovn-sbctl list mac_binding | grep -c '192.168.10.20')"
 ])
@@ -35257,7 +35270,7 @@  check ovs-vsctl add-br br-phys
 ovn_attach n1 br-phys 192.168.0.1
 
 dnl Ensure that there are at least 3 openflow connections.
-OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "3"])
+OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version' hv1/ovs-vswitchd.log)" -eq "4"])
 
 dnl "Wait" 3 times 60 seconds and ensure ovn-controller writes to the
 dnl openflow connections in the meantime.  This should allow ovs-vswitchd