diff mbox series

[ovs-dev,RFC] lib: inc-proc-eng: add the capability to run partial recompute

Message ID ee0aaf4afaefd5a1391c469bd846cbb990cd2af0.1648077653.git.lorenzo.bianconi@redhat.com
State RFC
Headers show
Series [ovs-dev,RFC] lib: inc-proc-eng: add the capability to run partial recompute | expand

Commit Message

Lorenzo Bianconi March 23, 2022, 11:22 p.m. UTC
Introduce the capability to run a recompute just on updated nodes in case
of an incremental processing abort (e.g. if the controller is not able to
write on the sb db and the change can't be processed incrementally).
This will avoid wasting CPU time on nodes that are not changed during
last run.

Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
---
 controller/ovn-controller.c | 20 +++++-----
 lib/inc-proc-eng.c          | 73 ++++++++++++++++++++++++++++++++++---
 lib/inc-proc-eng.h          | 12 +++++-
 northd/inc-proc-northd.c    |  8 ++--
 4 files changed, 92 insertions(+), 21 deletions(-)

Comments

Han Zhou March 31, 2022, 5:49 a.m. UTC | #1
On Wed, Mar 23, 2022 at 4:22 PM Lorenzo Bianconi <
lorenzo.bianconi@redhat.com> wrote:
>
> Introduce the capability to run a recompute just on updated nodes in case
> of an incremental processing abort (e.g. if the controller is not able to
> write on the sb db and the change can't be processed incrementally).
> This will avoid wasting CPU time on nodes that are not changed during
> last run.
>
> Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>

Thanks Lorenzo for the RFC. Here are my comments.

Firstly, I am confused by "recompute just on updated nodes" - if a node is
updated in the last run, it means it has already computed successfully its
output, then why need to recompute on that node again? I think what we
really want to do is that when an engine_run() is aborted, in the next run
we want to recompute the nodes that are STALE or ABORTED, and the nodes
that depend on (directly or indirectly) these nodes, instead of recomputing
all the nodes.

However, even if we implemented the above, it doesn't seem to help for the
performance, because the high cost nodes, such as lflow_output, depend on
almost all the nodes (indirectly), so any recompute of the nodes would
still trigger a lflow_run(). To solve this, we need to track changes (or
set state UNCHANGED if there is no change) when a node is recomputed, so
that the nodes depend on it can still do incremental processing.

So, I am afraid I didn't understand the approach of this implementation -
why would UPDATED nodes are recomputed and STALE node are not, and the
dependant nodes are not considered, either.

Could you describe more clearly the problem you are solving with this
patch, and give a real example that without this change it is triggering
lflow_run() unnecessarily and with this change it is avoided?

Thanks,
Han

> ---
>  controller/ovn-controller.c | 20 +++++-----
>  lib/inc-proc-eng.c          | 73 ++++++++++++++++++++++++++++++++++---
>  lib/inc-proc-eng.h          | 12 +++++-
>  northd/inc-proc-northd.c    |  8 ++--
>  4 files changed, 92 insertions(+), 21 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index ea5e9df41..0c9b6a162 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -557,7 +557,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct
ovsdb_idl *ovnsb_idl,
>      }
>      if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
>          VLOG_INFO("Resetting southbound database cluster state");
> -        engine_set_force_recompute(true);
> +        engine_request_recompute(EN_RECOMPUTE_FULL);
>          ovsdb_idl_reset_min_index(ovnsb_idl);
>          *reset_ovnsb_idl_min_index = false;
>      }
> @@ -3066,7 +3066,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl,
struct ovsdb_idl *ovnsb_idl,
>       * full recompute.
>       */
>      if (version_mismatch) {
> -        engine_set_force_recompute(true);
> +        engine_request_recompute(EN_RECOMPUTE_FULL);
>      }
>      version_mismatch = false;
>      return true;
> @@ -3524,7 +3524,7 @@ main(int argc, char *argv[])
>          if (new_ovs_cond_seqno != ovs_cond_seqno) {
>              if (!new_ovs_cond_seqno) {
>                  VLOG_INFO("OVS IDL reconnected, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_request_recompute(EN_RECOMPUTE_FULL);
>              }
>              ovs_cond_seqno = new_ovs_cond_seqno;
>          }
> @@ -3542,7 +3542,7 @@ main(int argc, char *argv[])
>          if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
>              if (!new_ovnsb_cond_seqno) {
>                  VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_request_recompute(EN_RECOMPUTE_FULL);
>                  vif_plug_reset_idl_prime_counter();
>              }
>              ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> @@ -3620,7 +3620,7 @@ main(int argc, char *argv[])
>                                             &br_int_dp->capabilities :
NULL,
>                                             br_int ? br_int->name :
NULL)) {
>                  VLOG_INFO("OVS feature set changed, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_request_recompute(EN_RECOMPUTE_FULL);
>              }
>
>              if (br_int) {
> @@ -3815,7 +3815,7 @@ main(int argc, char *argv[])
>                  if (engine_need_run()) {
>                      VLOG_DBG("engine did not run, force recompute next
time: "
>                               "br_int %p, chassis %p", br_int, chassis);
> -                    engine_set_force_recompute(true);
> +                    engine_request_recompute(EN_RECOMPUTE_PARTIAL);
>                      poll_immediate_wake();
>                  } else {
>                      VLOG_DBG("engine did not run, and it was not needed"
> @@ -3825,10 +3825,10 @@ main(int argc, char *argv[])
>              } else if (engine_aborted()) {
>                  VLOG_DBG("engine was aborted, force recompute next time:
"
>                           "br_int %p, chassis %p", br_int, chassis);
> -                engine_set_force_recompute(true);
> +                engine_request_recompute(EN_RECOMPUTE_PARTIAL);
>                  poll_immediate_wake();
>              } else {
> -                engine_set_force_recompute(false);
> +                engine_request_recompute(EN_RECOMPUTE_NONE);
>              }
>
>              store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> @@ -3882,7 +3882,7 @@ main(int argc, char *argv[])
>
>          if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
>              VLOG_INFO("OVNSB commit failed, force recompute next time.");
> -            engine_set_force_recompute(true);
> +            engine_request_recompute(EN_RECOMPUTE_FULL);
>          }
>
>          int ovs_txn_status =
ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> @@ -4202,7 +4202,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn
OVS_UNUSED,
>      VLOG_INFO("User triggered lflow cache flush.");
>      struct lflow_output_persistent_data *fo_pd = arg_;
>      lflow_cache_flush(fo_pd->lflow_cache);
> -    engine_set_force_recompute(true);
> +    engine_request_recompute(EN_RECOMPUTE_FULL);
>      poll_immediate_wake();
>      unixctl_command_reply(conn, NULL);
>  }
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 7b4391700..3b044598c 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -33,7 +33,7 @@
>
>  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
>
> -static bool engine_force_recompute = false;
> +static enum engine_recompute_request  recompute_request =
EN_RECOMPUTE_NONE;
>  static bool engine_run_aborted = false;
>  static const struct engine_context *engine_context;
>
> @@ -47,6 +47,12 @@ static const char
*engine_node_state_name[EN_STATE_MAX] = {
>      [EN_ABORTED]   = "Aborted",
>  };
>
> +static const char *engine_recompute_request_name[EN_RECOMPUTE_MAX] = {
> +    [EN_RECOMPUTE_NONE]     = "None",
> +    [EN_RECOMPUTE_PARTIAL]  = "Partial",
> +    [EN_RECOMPUTE_FULL]     = "Full",
> +};
> +
>  static long long engine_compute_log_timeout_msec = 500;
>
>  static void
> @@ -54,9 +60,28 @@ engine_recompute(struct engine_node *node, bool
allowed,
>                   const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
>
>  void
> -engine_set_force_recompute(bool val)
> +engine_request_recompute(enum engine_recompute_request val)
>  {
> -    engine_force_recompute = val;
> +    if (val == EN_RECOMPUTE_PARTIAL &&
> +        recompute_request == EN_RECOMPUTE_FULL) {
> +        /* pending EN_RECOMPUTE_FULL already requested. */
> +        return;
> +    }
> +
> +    /* EN_RECOMPUTE_FULL is allowed to overwrite EN_RECOMPUTE_PARTIAL. */
> +    recompute_request = val;
> +    VLOG_DBG("Requested recompute %s",
engine_recompute_request_name[val]);
> +
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> +            if (engine_nodes[i]->state == EN_UPDATED ||
> +                engine_nodes[i]->state == EN_ABORTED) {
> +                engine_nodes[i]->pending_recompute = true;
> +            }
> +        } else {
> +            engine_nodes[i]->pending_recompute = false;
> +        }
> +    }
>  }
>
>  const struct engine_context *
> @@ -428,6 +453,34 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>      return true;
>  }
>
> +static bool
> +engine_run_partial(struct engine_node *node, bool recompute_allowed)
> +{
> +    /* Let's try to do a selective recompute on pending changes before
> +     * performing a full recompute.
> +     */
> +    if (recompute_request != EN_RECOMPUTE_PARTIAL) {
> +        return false;
> +    }
> +
> +    if (node->pending_recompute) {
> +        goto recompute;
> +    }
> +
> +    for (size_t i = 0; i < node->n_inputs; i++) {
> +        if (node->inputs[i].node->pending_recompute) {
> +            goto recompute;
> +        }
> +    }
> +    return false;
> +
> +recompute:
> +    node->pending_recompute = false;
> +    engine_recompute(node, recompute_allowed, "selective recompute on
%s",
> +                     node->name);
> +    return true;
> +}
> +
>  static void
>  engine_run_node(struct engine_node *node, bool recompute_allowed)
>  {
> @@ -438,11 +491,15 @@ engine_run_node(struct engine_node *node, bool
recompute_allowed)
>          return;
>      }
>
> -    if (engine_force_recompute) {
> +    if (recompute_request == EN_RECOMPUTE_FULL) {
>          engine_recompute(node, recompute_allowed, "forced");
>          return;
>      }
>
> +    if (engine_run_partial(node, recompute_allowed)) {
> +        return;
> +    }
> +
>      /* If any of the inputs updated data but there is no change_handler,
then
>       * recompute the current node too.
>       */
> @@ -495,9 +552,13 @@ engine_run(bool recompute_allowed)
>          if (engine_nodes[i]->state == EN_ABORTED) {
>              engine_nodes[i]->stats.abort++;
>              engine_run_aborted = true;
> -            return;
> +            break;
>          }
>      }
> +
> +    if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> +        recompute_request = EN_RECOMPUTE_NONE;
> +    }
>  }
>
>  bool
> @@ -524,6 +585,6 @@ void
>  engine_trigger_recompute(void)
>  {
>      VLOG_INFO("User triggered force recompute.");
> -    engine_set_force_recompute(true);
> +    engine_request_recompute(EN_RECOMPUTE_FULL);
>      poll_immediate_wake();
>  }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index 9bfab1f7c..7b734e666 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -115,6 +115,13 @@ enum engine_node_state {
>      EN_STATE_MAX,
>  };
>
> +enum engine_recompute_request {
> +    EN_RECOMPUTE_NONE,      /* No recompute is necessary. */
> +    EN_RECOMPUTE_PARTIAL,   /* Partial recompute on updated node
requested. */
> +    EN_RECOMPUTE_FULL,      /* Full recompute requested. */
> +    EN_RECOMPUTE_MAX,
> +};
> +
>  struct engine_stats {
>      uint64_t recompute;
>      uint64_t compute;
> @@ -141,6 +148,9 @@ struct engine_node {
>      /* State of the node after the last engine run. */
>      enum engine_node_state state;
>
> +    /* This node needs to be processed by the upcoming partial
recompute. */
> +    bool pending_recompute;
> +
>      /* Method to allocate and initialize node data. It may be NULL.
>       * The user supplied argument 'arg' is passed from the call to
>       * engine_init().
> @@ -216,7 +226,7 @@ void engine_add_input(struct engine_node *node,
struct engine_node *input,
>   * in circumstances when we are not sure there is change or not, or
>   * when there is change but the engine couldn't be executed in that
>   * iteration, and the change can't be tracked across iterations */
> -void engine_set_force_recompute(bool val);
> +void engine_request_recompute(enum engine_recompute_request val);
>
>  /* Return the current engine_context. The values in the context can be
NULL
>   * if the engine is run with allow_recompute == false in the current
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index af55221e3..63c7faa7d 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -258,7 +258,7 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
*ovnnb_txn,
>       * force-recompute request if 'recompute' is false.
>       */
>      if (recompute) {
> -        engine_set_force_recompute(recompute);
> +        engine_request_recompute(EN_RECOMPUTE_FULL);
>      }
>
>      struct engine_context eng_ctx = {
> @@ -275,17 +275,17 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
*ovnnb_txn,
>      if (!engine_has_run()) {
>          if (engine_need_run()) {
>              VLOG_DBG("engine did not run, force recompute next time.");
> -            engine_set_force_recompute(true);
> +            engine_request_recompute(EN_RECOMPUTE_FULL);
>              poll_immediate_wake();
>          } else {
>              VLOG_DBG("engine did not run, and it was not needed");
>          }
>      } else if (engine_aborted()) {
>          VLOG_DBG("engine was aborted, force recompute next time.");
> -        engine_set_force_recompute(true);
> +        engine_request_recompute(EN_RECOMPUTE_FULL);
>          poll_immediate_wake();
>      } else {
> -        engine_set_force_recompute(false);
> +        engine_request_recompute(EN_RECOMPUTE_NONE);
>      }
>  }
>
> --
> 2.35.1
>
Lorenzo Bianconi April 13, 2022, 10 p.m. UTC | #2
> On Wed, Mar 23, 2022 at 4:22 PM Lorenzo Bianconi <
> lorenzo.bianconi@redhat.com> wrote:
> >
> > Introduce the capability to run a recompute just on updated nodes in case
> > of an incremental processing abort (e.g. if the controller is not able to
> > write on the sb db and the change can't be processed incrementally).
> > This will avoid wasting CPU time on nodes that are not changed during
> > last run.
> >
> > Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
> 
> Thanks Lorenzo for the RFC. Here are my comments.

Hi Han,

thx for the review :)

> 
> Firstly, I am confused by "recompute just on updated nodes" - if a node is
> updated in the last run, it means it has already computed successfully its
> output, then why need to recompute on that node again? I think what we
> really want to do is that when an engine_run() is aborted, in the next run
> we want to recompute the nodes that are STALE or ABORTED, and the nodes
> that depend on (directly or indirectly) these nodes, instead of recomputing
> all the nodes.

Reviewing it, I agree, we should not rely on EN_UPDATED state to run
engine_recompute().
Probably we should just recompute aborted nodes and their directly connected
inputs on the next run but we should avoid breaking on the first aborted node in
engine_run().
Do you think this approach will end up recomputing all the nodes (or the high cost
ones e.g. lflow) since the graph is highly connected?

> 
> However, even if we implemented the above, it doesn't seem to help for the
> performance, because the high cost nodes, such as lflow_output, depend on
> almost all the nodes (indirectly), so any recompute of the nodes would
> still trigger a lflow_run(). To solve this, we need to track changes (or
> set state UNCHANGED if there is no change) when a node is recomputed, so
> that the nodes depend on it can still do incremental processing.

Do you think it is possible to keep track of changed db infos between multiple
runs in the IDL layer in order to re-process data incrementally in the next
run?

Regards,
Lorenzo

> 
> So, I am afraid I didn't understand the approach of this implementation -
> why would UPDATED nodes are recomputed and STALE node are not, and the
> dependant nodes are not considered, either.
> 
> Could you describe more clearly the problem you are solving with this
> patch, and give a real example that without this change it is triggering
> lflow_run() unnecessarily and with this change it is avoided?
> 
> Thanks,
> Han
> 
> > ---
> >  controller/ovn-controller.c | 20 +++++-----
> >  lib/inc-proc-eng.c          | 73 ++++++++++++++++++++++++++++++++++---
> >  lib/inc-proc-eng.h          | 12 +++++-
> >  northd/inc-proc-northd.c    |  8 ++--
> >  4 files changed, 92 insertions(+), 21 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index ea5e9df41..0c9b6a162 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -557,7 +557,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct
> ovsdb_idl *ovnsb_idl,
> >      }
> >      if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
> >          VLOG_INFO("Resetting southbound database cluster state");
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >          ovsdb_idl_reset_min_index(ovnsb_idl);
> >          *reset_ovnsb_idl_min_index = false;
> >      }
> > @@ -3066,7 +3066,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl,
> struct ovsdb_idl *ovnsb_idl,
> >       * full recompute.
> >       */
> >      if (version_mismatch) {
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >      }
> >      version_mismatch = false;
> >      return true;
> > @@ -3524,7 +3524,7 @@ main(int argc, char *argv[])
> >          if (new_ovs_cond_seqno != ovs_cond_seqno) {
> >              if (!new_ovs_cond_seqno) {
> >                  VLOG_INFO("OVS IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >              }
> >              ovs_cond_seqno = new_ovs_cond_seqno;
> >          }
> > @@ -3542,7 +3542,7 @@ main(int argc, char *argv[])
> >          if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
> >              if (!new_ovnsb_cond_seqno) {
> >                  VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >                  vif_plug_reset_idl_prime_counter();
> >              }
> >              ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> > @@ -3620,7 +3620,7 @@ main(int argc, char *argv[])
> >                                             &br_int_dp->capabilities :
> NULL,
> >                                             br_int ? br_int->name :
> NULL)) {
> >                  VLOG_INFO("OVS feature set changed, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >              }
> >
> >              if (br_int) {
> > @@ -3815,7 +3815,7 @@ main(int argc, char *argv[])
> >                  if (engine_need_run()) {
> >                      VLOG_DBG("engine did not run, force recompute next
> time: "
> >                               "br_int %p, chassis %p", br_int, chassis);
> > -                    engine_set_force_recompute(true);
> > +                    engine_request_recompute(EN_RECOMPUTE_PARTIAL);
> >                      poll_immediate_wake();
> >                  } else {
> >                      VLOG_DBG("engine did not run, and it was not needed"
> > @@ -3825,10 +3825,10 @@ main(int argc, char *argv[])
> >              } else if (engine_aborted()) {
> >                  VLOG_DBG("engine was aborted, force recompute next time:
> "
> >                           "br_int %p, chassis %p", br_int, chassis);
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_PARTIAL);
> >                  poll_immediate_wake();
> >              } else {
> > -                engine_set_force_recompute(false);
> > +                engine_request_recompute(EN_RECOMPUTE_NONE);
> >              }
> >
> >              store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> > @@ -3882,7 +3882,7 @@ main(int argc, char *argv[])
> >
> >          if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
> >              VLOG_INFO("OVNSB commit failed, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_request_recompute(EN_RECOMPUTE_FULL);
> >          }
> >
> >          int ovs_txn_status =
> ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> > @@ -4202,7 +4202,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn
> OVS_UNUSED,
> >      VLOG_INFO("User triggered lflow cache flush.");
> >      struct lflow_output_persistent_data *fo_pd = arg_;
> >      lflow_cache_flush(fo_pd->lflow_cache);
> > -    engine_set_force_recompute(true);
> > +    engine_request_recompute(EN_RECOMPUTE_FULL);
> >      poll_immediate_wake();
> >      unixctl_command_reply(conn, NULL);
> >  }
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 7b4391700..3b044598c 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -33,7 +33,7 @@
> >
> >  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
> >
> > -static bool engine_force_recompute = false;
> > +static enum engine_recompute_request  recompute_request =
> EN_RECOMPUTE_NONE;
> >  static bool engine_run_aborted = false;
> >  static const struct engine_context *engine_context;
> >
> > @@ -47,6 +47,12 @@ static const char
> *engine_node_state_name[EN_STATE_MAX] = {
> >      [EN_ABORTED]   = "Aborted",
> >  };
> >
> > +static const char *engine_recompute_request_name[EN_RECOMPUTE_MAX] = {
> > +    [EN_RECOMPUTE_NONE]     = "None",
> > +    [EN_RECOMPUTE_PARTIAL]  = "Partial",
> > +    [EN_RECOMPUTE_FULL]     = "Full",
> > +};
> > +
> >  static long long engine_compute_log_timeout_msec = 500;
> >
> >  static void
> > @@ -54,9 +60,28 @@ engine_recompute(struct engine_node *node, bool
> allowed,
> >                   const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
> >
> >  void
> > -engine_set_force_recompute(bool val)
> > +engine_request_recompute(enum engine_recompute_request val)
> >  {
> > -    engine_force_recompute = val;
> > +    if (val == EN_RECOMPUTE_PARTIAL &&
> > +        recompute_request == EN_RECOMPUTE_FULL) {
> > +        /* pending EN_RECOMPUTE_FULL already requested. */
> > +        return;
> > +    }
> > +
> > +    /* EN_RECOMPUTE_FULL is allowed to overwrite EN_RECOMPUTE_PARTIAL. */
> > +    recompute_request = val;
> > +    VLOG_DBG("Requested recompute %s",
> engine_recompute_request_name[val]);
> > +
> > +    for (size_t i = 0; i < engine_n_nodes; i++) {
> > +        if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> > +            if (engine_nodes[i]->state == EN_UPDATED ||
> > +                engine_nodes[i]->state == EN_ABORTED) {
> > +                engine_nodes[i]->pending_recompute = true;
> > +            }
> > +        } else {
> > +            engine_nodes[i]->pending_recompute = false;
> > +        }
> > +    }
> >  }
> >
> >  const struct engine_context *
> > @@ -428,6 +453,34 @@ engine_compute(struct engine_node *node, bool
> recompute_allowed)
> >      return true;
> >  }
> >
> > +static bool
> > +engine_run_partial(struct engine_node *node, bool recompute_allowed)
> > +{
> > +    /* Let's try to do a selective recompute on pending changes before
> > +     * performing a full recompute.
> > +     */
> > +    if (recompute_request != EN_RECOMPUTE_PARTIAL) {
> > +        return false;
> > +    }
> > +
> > +    if (node->pending_recompute) {
> > +        goto recompute;
> > +    }
> > +
> > +    for (size_t i = 0; i < node->n_inputs; i++) {
> > +        if (node->inputs[i].node->pending_recompute) {
> > +            goto recompute;
> > +        }
> > +    }
> > +    return false;
> > +
> > +recompute:
> > +    node->pending_recompute = false;
> > +    engine_recompute(node, recompute_allowed, "selective recompute on
> %s",
> > +                     node->name);
> > +    return true;
> > +}
> > +
> >  static void
> >  engine_run_node(struct engine_node *node, bool recompute_allowed)
> >  {
> > @@ -438,11 +491,15 @@ engine_run_node(struct engine_node *node, bool
> recompute_allowed)
> >          return;
> >      }
> >
> > -    if (engine_force_recompute) {
> > +    if (recompute_request == EN_RECOMPUTE_FULL) {
> >          engine_recompute(node, recompute_allowed, "forced");
> >          return;
> >      }
> >
> > +    if (engine_run_partial(node, recompute_allowed)) {
> > +        return;
> > +    }
> > +
> >      /* If any of the inputs updated data but there is no change_handler,
> then
> >       * recompute the current node too.
> >       */
> > @@ -495,9 +552,13 @@ engine_run(bool recompute_allowed)
> >          if (engine_nodes[i]->state == EN_ABORTED) {
> >              engine_nodes[i]->stats.abort++;
> >              engine_run_aborted = true;
> > -            return;
> > +            break;
> >          }
> >      }
> > +
> > +    if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> > +        recompute_request = EN_RECOMPUTE_NONE;
> > +    }
> >  }
> >
> >  bool
> > @@ -524,6 +585,6 @@ void
> >  engine_trigger_recompute(void)
> >  {
> >      VLOG_INFO("User triggered force recompute.");
> > -    engine_set_force_recompute(true);
> > +    engine_request_recompute(EN_RECOMPUTE_FULL);
> >      poll_immediate_wake();
> >  }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index 9bfab1f7c..7b734e666 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -115,6 +115,13 @@ enum engine_node_state {
> >      EN_STATE_MAX,
> >  };
> >
> > +enum engine_recompute_request {
> > +    EN_RECOMPUTE_NONE,      /* No recompute is necessary. */
> > +    EN_RECOMPUTE_PARTIAL,   /* Partial recompute on updated node
> requested. */
> > +    EN_RECOMPUTE_FULL,      /* Full recompute requested. */
> > +    EN_RECOMPUTE_MAX,
> > +};
> > +
> >  struct engine_stats {
> >      uint64_t recompute;
> >      uint64_t compute;
> > @@ -141,6 +148,9 @@ struct engine_node {
> >      /* State of the node after the last engine run. */
> >      enum engine_node_state state;
> >
> > +    /* This node needs to be processed by the upcoming partial
> recompute. */
> > +    bool pending_recompute;
> > +
> >      /* Method to allocate and initialize node data. It may be NULL.
> >       * The user supplied argument 'arg' is passed from the call to
> >       * engine_init().
> > @@ -216,7 +226,7 @@ void engine_add_input(struct engine_node *node,
> struct engine_node *input,
> >   * in circumstances when we are not sure there is change or not, or
> >   * when there is change but the engine couldn't be executed in that
> >   * iteration, and the change can't be tracked across iterations */
> > -void engine_set_force_recompute(bool val);
> > +void engine_request_recompute(enum engine_recompute_request val);
> >
> >  /* Return the current engine_context. The values in the context can be
> NULL
> >   * if the engine is run with allow_recompute == false in the current
> > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> > index af55221e3..63c7faa7d 100644
> > --- a/northd/inc-proc-northd.c
> > +++ b/northd/inc-proc-northd.c
> > @@ -258,7 +258,7 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
> *ovnnb_txn,
> >       * force-recompute request if 'recompute' is false.
> >       */
> >      if (recompute) {
> > -        engine_set_force_recompute(recompute);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >      }
> >
> >      struct engine_context eng_ctx = {
> > @@ -275,17 +275,17 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
> *ovnnb_txn,
> >      if (!engine_has_run()) {
> >          if (engine_need_run()) {
> >              VLOG_DBG("engine did not run, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_request_recompute(EN_RECOMPUTE_FULL);
> >              poll_immediate_wake();
> >          } else {
> >              VLOG_DBG("engine did not run, and it was not needed");
> >          }
> >      } else if (engine_aborted()) {
> >          VLOG_DBG("engine was aborted, force recompute next time.");
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >          poll_immediate_wake();
> >      } else {
> > -        engine_set_force_recompute(false);
> > +        engine_request_recompute(EN_RECOMPUTE_NONE);
> >      }
> >  }
> >
> > --
> > 2.35.1
> >
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index ea5e9df41..0c9b6a162 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -557,7 +557,7 @@  update_sb_db(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
     }
     if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
         VLOG_INFO("Resetting southbound database cluster state");
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
         ovsdb_idl_reset_min_index(ovnsb_idl);
         *reset_ovnsb_idl_min_index = false;
     }
@@ -3066,7 +3066,7 @@  check_northd_version(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
      * full recompute.
      */
     if (version_mismatch) {
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
     }
     version_mismatch = false;
     return true;
@@ -3524,7 +3524,7 @@  main(int argc, char *argv[])
         if (new_ovs_cond_seqno != ovs_cond_seqno) {
             if (!new_ovs_cond_seqno) {
                 VLOG_INFO("OVS IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
             }
             ovs_cond_seqno = new_ovs_cond_seqno;
         }
@@ -3542,7 +3542,7 @@  main(int argc, char *argv[])
         if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
             if (!new_ovnsb_cond_seqno) {
                 VLOG_INFO("OVNSB IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
                 vif_plug_reset_idl_prime_counter();
             }
             ovnsb_cond_seqno = new_ovnsb_cond_seqno;
@@ -3620,7 +3620,7 @@  main(int argc, char *argv[])
                                            &br_int_dp->capabilities : NULL,
                                            br_int ? br_int->name : NULL)) {
                 VLOG_INFO("OVS feature set changed, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
             }
 
             if (br_int) {
@@ -3815,7 +3815,7 @@  main(int argc, char *argv[])
                 if (engine_need_run()) {
                     VLOG_DBG("engine did not run, force recompute next time: "
                              "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
+                    engine_request_recompute(EN_RECOMPUTE_PARTIAL);
                     poll_immediate_wake();
                 } else {
                     VLOG_DBG("engine did not run, and it was not needed"
@@ -3825,10 +3825,10 @@  main(int argc, char *argv[])
             } else if (engine_aborted()) {
                 VLOG_DBG("engine was aborted, force recompute next time: "
                          "br_int %p, chassis %p", br_int, chassis);
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_PARTIAL);
                 poll_immediate_wake();
             } else {
-                engine_set_force_recompute(false);
+                engine_request_recompute(EN_RECOMPUTE_NONE);
             }
 
             store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
@@ -3882,7 +3882,7 @@  main(int argc, char *argv[])
 
         if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
             VLOG_INFO("OVNSB commit failed, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_request_recompute(EN_RECOMPUTE_FULL);
         }
 
         int ovs_txn_status = ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
@@ -4202,7 +4202,7 @@  lflow_cache_flush_cmd(struct unixctl_conn *conn OVS_UNUSED,
     VLOG_INFO("User triggered lflow cache flush.");
     struct lflow_output_persistent_data *fo_pd = arg_;
     lflow_cache_flush(fo_pd->lflow_cache);
-    engine_set_force_recompute(true);
+    engine_request_recompute(EN_RECOMPUTE_FULL);
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 7b4391700..3b044598c 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -33,7 +33,7 @@ 
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
 
-static bool engine_force_recompute = false;
+static enum engine_recompute_request  recompute_request = EN_RECOMPUTE_NONE;
 static bool engine_run_aborted = false;
 static const struct engine_context *engine_context;
 
@@ -47,6 +47,12 @@  static const char *engine_node_state_name[EN_STATE_MAX] = {
     [EN_ABORTED]   = "Aborted",
 };
 
+static const char *engine_recompute_request_name[EN_RECOMPUTE_MAX] = {
+    [EN_RECOMPUTE_NONE]     = "None",
+    [EN_RECOMPUTE_PARTIAL]  = "Partial",
+    [EN_RECOMPUTE_FULL]     = "Full",
+};
+
 static long long engine_compute_log_timeout_msec = 500;
 
 static void
@@ -54,9 +60,28 @@  engine_recompute(struct engine_node *node, bool allowed,
                  const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
 
 void
-engine_set_force_recompute(bool val)
+engine_request_recompute(enum engine_recompute_request val)
 {
-    engine_force_recompute = val;
+    if (val == EN_RECOMPUTE_PARTIAL &&
+        recompute_request == EN_RECOMPUTE_FULL) {
+        /* pending EN_RECOMPUTE_FULL already requested. */
+        return;
+    }
+
+    /* EN_RECOMPUTE_FULL is allowed to overwrite EN_RECOMPUTE_PARTIAL. */
+    recompute_request = val;
+    VLOG_DBG("Requested recompute %s", engine_recompute_request_name[val]);
+
+    for (size_t i = 0; i < engine_n_nodes; i++) {
+        if (recompute_request == EN_RECOMPUTE_PARTIAL) {
+            if (engine_nodes[i]->state == EN_UPDATED ||
+                engine_nodes[i]->state == EN_ABORTED) {
+                engine_nodes[i]->pending_recompute = true;
+            }
+        } else {
+            engine_nodes[i]->pending_recompute = false;
+        }
+    }
 }
 
 const struct engine_context *
@@ -428,6 +453,34 @@  engine_compute(struct engine_node *node, bool recompute_allowed)
     return true;
 }
 
+static bool
+engine_run_partial(struct engine_node *node, bool recompute_allowed)
+{
+    /* Let's try to do a selective recompute on pending changes before
+     * performing a full recompute.
+     */
+    if (recompute_request != EN_RECOMPUTE_PARTIAL) {
+        return false;
+    }
+
+    if (node->pending_recompute) {
+        goto recompute;
+    }
+
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        if (node->inputs[i].node->pending_recompute) {
+            goto recompute;
+        }
+    }
+    return false;
+
+recompute:
+    node->pending_recompute = false;
+    engine_recompute(node, recompute_allowed, "selective recompute on %s",
+                     node->name);
+    return true;
+}
+
 static void
 engine_run_node(struct engine_node *node, bool recompute_allowed)
 {
@@ -438,11 +491,15 @@  engine_run_node(struct engine_node *node, bool recompute_allowed)
         return;
     }
 
-    if (engine_force_recompute) {
+    if (recompute_request == EN_RECOMPUTE_FULL) {
         engine_recompute(node, recompute_allowed, "forced");
         return;
     }
 
+    if (engine_run_partial(node, recompute_allowed)) {
+        return;
+    }
+
     /* If any of the inputs updated data but there is no change_handler, then
      * recompute the current node too.
      */
@@ -495,9 +552,13 @@  engine_run(bool recompute_allowed)
         if (engine_nodes[i]->state == EN_ABORTED) {
             engine_nodes[i]->stats.abort++;
             engine_run_aborted = true;
-            return;
+            break;
         }
     }
+
+    if (recompute_request == EN_RECOMPUTE_PARTIAL) {
+        recompute_request = EN_RECOMPUTE_NONE;
+    }
 }
 
 bool
@@ -524,6 +585,6 @@  void
 engine_trigger_recompute(void)
 {
     VLOG_INFO("User triggered force recompute.");
-    engine_set_force_recompute(true);
+    engine_request_recompute(EN_RECOMPUTE_FULL);
     poll_immediate_wake();
 }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 9bfab1f7c..7b734e666 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -115,6 +115,13 @@  enum engine_node_state {
     EN_STATE_MAX,
 };
 
+enum engine_recompute_request {
+    EN_RECOMPUTE_NONE,      /* No recompute is necessary. */
+    EN_RECOMPUTE_PARTIAL,   /* Partial recompute on updated node requested. */
+    EN_RECOMPUTE_FULL,      /* Full recompute requested. */
+    EN_RECOMPUTE_MAX,
+};
+
 struct engine_stats {
     uint64_t recompute;
     uint64_t compute;
@@ -141,6 +148,9 @@  struct engine_node {
     /* State of the node after the last engine run. */
     enum engine_node_state state;
 
+    /* This node needs to be processed by the upcoming partial recompute. */
+    bool pending_recompute;
+
     /* Method to allocate and initialize node data. It may be NULL.
      * The user supplied argument 'arg' is passed from the call to
      * engine_init().
@@ -216,7 +226,7 @@  void engine_add_input(struct engine_node *node, struct engine_node *input,
  * in circumstances when we are not sure there is change or not, or
  * when there is change but the engine couldn't be executed in that
  * iteration, and the change can't be tracked across iterations */
-void engine_set_force_recompute(bool val);
+void engine_request_recompute(enum engine_recompute_request val);
 
 /* Return the current engine_context. The values in the context can be NULL
  * if the engine is run with allow_recompute == false in the current
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index af55221e3..63c7faa7d 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -258,7 +258,7 @@  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
      * force-recompute request if 'recompute' is false.
      */
     if (recompute) {
-        engine_set_force_recompute(recompute);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
     }
 
     struct engine_context eng_ctx = {
@@ -275,17 +275,17 @@  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
     if (!engine_has_run()) {
         if (engine_need_run()) {
             VLOG_DBG("engine did not run, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_request_recompute(EN_RECOMPUTE_FULL);
             poll_immediate_wake();
         } else {
             VLOG_DBG("engine did not run, and it was not needed");
         }
     } else if (engine_aborted()) {
         VLOG_DBG("engine was aborted, force recompute next time.");
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
         poll_immediate_wake();
     } else {
-        engine_set_force_recompute(false);
+        engine_request_recompute(EN_RECOMPUTE_NONE);
     }
 }