Message ID | 20191114170455.30391.22041.stgit@dceara.remote.csb |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,v3,ovn,1/4] ovn-controller: Refactor I-P engine_run() tracking. | expand |
On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote: > > This commit simplifies the logic of calling engine_run and engine_need_run in > order to reduce the number of external variables required to track the result > of the last engine execution. > > The engine code is also refactored a bit and the engine_run() function is > split in different functions that handle computing/recomputing a node. > > Signed-off-by: Dumitru Ceara <dceara@redhat.com> > --- > controller/ovn-controller.c | 33 ++++++----- > lib/inc-proc-eng.c | 124 +++++++++++++++++++++++++++++-------------- > lib/inc-proc-eng.h | 7 ++ > 3 files changed, 107 insertions(+), 57 deletions(-) > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > index 9ab98be..3922f3d 100644 > --- a/controller/ovn-controller.c > +++ b/controller/ovn-controller.c > @@ -1942,7 +1942,6 @@ main(int argc, char *argv[]) > &pending_pkt); > > uint64_t engine_run_id = 0; > - uint64_t old_engine_run_id = 0; > bool engine_run_done = true; > > unsigned int ovs_cond_seqno = UINT_MAX; > @@ -1952,10 +1951,11 @@ main(int argc, char *argv[]) > exiting = false; > restart = false; > while (!exiting) { > + engine_run_id++; > + > update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); > update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); > ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl)); > - old_engine_run_id = engine_run_id; > > struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop); > unsigned int new_ovs_cond_seqno > @@ -2047,12 +2047,12 @@ main(int argc, char *argv[]) > if (engine_run_done) { > engine_set_abort_recompute(true); > engine_run_done = engine_run(&en_flow_output, > - ++engine_run_id); > + engine_run_id); > } > } else { > engine_set_abort_recompute(false); > engine_run_done = true; > - engine_run(&en_flow_output, ++engine_run_id); > + engine_run(&en_flow_output, engine_run_id); > } > } > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > @@ -2095,17 +2095,20 @@ main(int argc, char *argv[]) > } > > } > - if (old_engine_run_id == engine_run_id || !engine_run_done) { > - if (!engine_run_done || engine_need_run(&en_flow_output)) { > - VLOG_DBG("engine did not run, force recompute next time: " > - "br_int %p, chassis %p", br_int, chassis); > - engine_set_force_recompute(true); > - poll_immediate_wake(); > - } else { > - VLOG_DBG("engine did not run, and it was not needed" > - " either: br_int %p, chassis %p", > - br_int, chassis); > - } > + if (engine_need_run(&en_flow_output, engine_run_id)) { > + VLOG_DBG("engine did not run, force recompute next time: " > + "br_int %p, chassis %p", br_int, chassis); > + engine_set_force_recompute(true); > + poll_immediate_wake(); > + } else if (!engine_run_done) { > + VLOG_DBG("engine was aborted, force recompute next time: " > + "br_int %p, chassis %p", br_int, chassis); > + engine_set_force_recompute(true); > + poll_immediate_wake(); > + } else if (!engine_has_run(&en_flow_output, engine_run_id)) { > + VLOG_DBG("engine did not run, and it was not needed" > + " either: br_int %p, chassis %p", > + br_int, chassis); > } else { > engine_set_force_recompute(false); > } > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > index 1064a08..8a085e2 100644 > --- a/lib/inc-proc-eng.c > +++ b/lib/inc-proc-eng.c > @@ -129,14 +129,72 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, > } > > bool > -engine_run(struct engine_node *node, uint64_t run_id) > +engine_has_run(struct engine_node *node, uint64_t run_id) > +{ > + return node->run_id == run_id; > +} > + > +/* Do a full recompute (or at least try). If we're not allowed then > + * mark the node as "aborted". > + */ > +static bool > +engine_recompute(struct engine_node *node, bool forced, bool allowed) > +{ > + VLOG_DBG("node: %s, recompute (%s)", node->name, > + forced ? "forced" : "triggered"); > + > + if (!allowed) { > + VLOG_DBG("node: %s, recompute aborted", node->name); > + return false; > + } > + > + node->run(node); > + VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > + return true; > +} > + > +/* Return true if the node could be computed without triggerring a full > + * recompute. > + */ > +static bool > +engine_compute(struct engine_node *node, bool recompute_allowed) > +{ > + for (size_t i = 0; i < node->n_inputs; i++) { > + /* If the input node data changed call its change handler. */ > + if (node->inputs[i].node->changed) { > + VLOG_DBG("node: %s, handle change for input %s", > + node->name, node->inputs[i].node->name); > + > + /* If the input change can't be handled incrementally, run > + * the node handler. > + */ > + if (!node->inputs[i].change_handler(node)) { Before calling change_handler, we need to check if its is NULL or not right ? Thanks Numan > + VLOG_DBG("node: %s, can't handle change for input %s, " > + "fall back to recompute", > + node->name, node->inputs[i].node->name); > + if (!engine_recompute(node, false, recompute_allowed)) { > + return false; > + } > + } > + } > + } > + > + return true; > +} > + > +bool engine_run(struct engine_node *node, uint64_t run_id) > { > if (node->run_id == run_id) { > + /* The node was already updated in this run (could be input for > + * multiple other nodes). Stop processing. > + */ > return true; > } > - node->run_id = run_id; > > + /* Initialize the node for this run. */ > + node->run_id = run_id; > node->changed = false; > + > if (!node->n_inputs) { > node->run(node); > VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > @@ -150,59 +208,45 @@ engine_run(struct engine_node *node, uint64_t run_id) > } > > bool need_compute = false; > - bool need_recompute = false; > > if (engine_force_recompute) { > - need_recompute = true; > - } else { > - for (size_t i = 0; i < node->n_inputs; i++) { > - if (node->inputs[i].node->changed) { > - need_compute = true; > - if (!node->inputs[i].change_handler) { > - need_recompute = true; > - break; > - } > - } > - } > + return engine_recompute(node, true, !engine_abort_recompute); > } > > - if (need_recompute) { > - VLOG_DBG("node: %s, recompute (%s)", node->name, > - engine_force_recompute ? "forced" : "triggered"); > - if (engine_abort_recompute) { > - VLOG_DBG("node: %s, recompute aborted", node->name); > - return false; > - } > - node->run(node); > - } else if (need_compute) { > - for (size_t i = 0; i < node->n_inputs; i++) { > - if (node->inputs[i].node->changed) { > - VLOG_DBG("node: %s, handle change for input %s", > - node->name, node->inputs[i].node->name); > - if (!node->inputs[i].change_handler(node)) { > - VLOG_DBG("node: %s, can't handle change for input %s, " > - "fall back to recompute", > - node->name, node->inputs[i].node->name); > - if (engine_abort_recompute) { > - VLOG_DBG("node: %s, recompute aborted", node->name); > - return false; > - } > - node->run(node); > - break; > - } > + /* If any of the inputs updated data but there is no change_handler, then > + * recompute the current node too. > + */ > + for (size_t i = 0; i < node->n_inputs; i++) { > + if (node->inputs[i].node->changed) { > + need_compute = true; > + > + /* Trigger a recompute if we don't have a change handler. */ > + if (!node->inputs[i].change_handler) { > + return engine_recompute(node, false, !engine_abort_recompute); > } > } > } > > + if (need_compute) { > + /* If we couldn't compute the node we either aborted or triggered > + * a full recompute. In any case, stop processing. > + */ > + return engine_compute(node, !engine_abort_recompute); > + } > + > VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > return true; > } > > bool > -engine_need_run(struct engine_node *node) > +engine_need_run(struct engine_node *node, uint64_t run_id) > { > size_t i; > > + if (node->run_id == run_id) { > + return false; > + } > + > if (!node->n_inputs) { > node->run(node); > VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); > @@ -210,7 +254,7 @@ engine_need_run(struct engine_node *node) > } > > for (i = 0; i < node->n_inputs; i++) { > - if (engine_need_run(node->inputs[i].node)) { > + if (engine_need_run(node->inputs[i].node, run_id)) { > return true; > } > } > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > index 3a69dc2..abd41b2 100644 > --- a/lib/inc-proc-eng.h > +++ b/lib/inc-proc-eng.h > @@ -130,9 +130,9 @@ bool engine_run(struct engine_node *, uint64_t run_id); > * terminates. */ > void engine_cleanup(struct engine_node *); > > -/* Check if engine needs to run, i.e. any change to be processed. */ > +/* Check if engine needs to run but didn't. */ > bool > -engine_need_run(struct engine_node *); > +engine_need_run(struct engine_node *, uint64_t run_id); > > /* Get the input node with <name> for <node> */ > struct engine_node * engine_get_input(const char *input_name, > @@ -159,6 +159,9 @@ const struct engine_context * engine_get_context(void); > > void engine_set_context(const struct engine_context *); > > +/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ > +bool engine_has_run(struct engine_node *node, uint64_t run_id); > + > struct ed_ovsdb_index { > const char *name; > struct ovsdb_idl_index *index; > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev >
On Mon, Nov 18, 2019 at 8:39 AM Numan Siddique <numans@ovn.org> wrote: > > On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote: > > > > This commit simplifies the logic of calling engine_run and engine_need_run in > > order to reduce the number of external variables required to track the result > > of the last engine execution. > > > > The engine code is also refactored a bit and the engine_run() function is > > split in different functions that handle computing/recomputing a node. > > > > Signed-off-by: Dumitru Ceara <dceara@redhat.com> > > --- > > controller/ovn-controller.c | 33 ++++++----- > > lib/inc-proc-eng.c | 124 +++++++++++++++++++++++++++++-------------- > > lib/inc-proc-eng.h | 7 ++ > > 3 files changed, 107 insertions(+), 57 deletions(-) > > > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > > index 9ab98be..3922f3d 100644 > > --- a/controller/ovn-controller.c > > +++ b/controller/ovn-controller.c > > @@ -1942,7 +1942,6 @@ main(int argc, char *argv[]) > > &pending_pkt); > > > > uint64_t engine_run_id = 0; > > - uint64_t old_engine_run_id = 0; > > bool engine_run_done = true; > > > > unsigned int ovs_cond_seqno = UINT_MAX; > > @@ -1952,10 +1951,11 @@ main(int argc, char *argv[]) > > exiting = false; > > restart = false; > > while (!exiting) { > > + engine_run_id++; > > + > > update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); > > update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); > > ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl)); > > - old_engine_run_id = engine_run_id; > > > > struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop); > > unsigned int new_ovs_cond_seqno > > @@ -2047,12 +2047,12 @@ main(int argc, char *argv[]) > > if (engine_run_done) { > > engine_set_abort_recompute(true); > > engine_run_done = engine_run(&en_flow_output, > > - ++engine_run_id); > > + engine_run_id); > > } > > } else { > > engine_set_abort_recompute(false); > > engine_run_done = true; > > - engine_run(&en_flow_output, ++engine_run_id); > > + engine_run(&en_flow_output, engine_run_id); > > } > > } > > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > > @@ -2095,17 +2095,20 @@ main(int argc, char *argv[]) > > } > > > > } > > - if (old_engine_run_id == engine_run_id || !engine_run_done) { > > - if (!engine_run_done || engine_need_run(&en_flow_output)) { > > - VLOG_DBG("engine did not run, force recompute next time: " > > - "br_int %p, chassis %p", br_int, chassis); > > - engine_set_force_recompute(true); > > - poll_immediate_wake(); > > - } else { > > - VLOG_DBG("engine did not run, and it was not needed" > > - " either: br_int %p, chassis %p", > > - br_int, chassis); > > - } > > + if (engine_need_run(&en_flow_output, engine_run_id)) { > > + VLOG_DBG("engine did not run, force recompute next time: " > > + "br_int %p, chassis %p", br_int, chassis); > > + engine_set_force_recompute(true); > > + poll_immediate_wake(); > > + } else if (!engine_run_done) { > > + VLOG_DBG("engine was aborted, force recompute next time: " > > + "br_int %p, chassis %p", br_int, chassis); > > + engine_set_force_recompute(true); > > + poll_immediate_wake(); > > + } else if (!engine_has_run(&en_flow_output, engine_run_id)) { > > + VLOG_DBG("engine did not run, and it was not needed" > > + " either: br_int %p, chassis %p", > > + br_int, chassis); > > } else { > > engine_set_force_recompute(false); > > } > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > > index 1064a08..8a085e2 100644 > > --- a/lib/inc-proc-eng.c > > +++ b/lib/inc-proc-eng.c > > @@ -129,14 +129,72 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, > > } > > > > bool > > -engine_run(struct engine_node *node, uint64_t run_id) > > +engine_has_run(struct engine_node *node, uint64_t run_id) > > +{ > > + return node->run_id == run_id; > > +} > > + > > +/* Do a full recompute (or at least try). If we're not allowed then > > + * mark the node as "aborted". > > + */ > > +static bool > > +engine_recompute(struct engine_node *node, bool forced, bool allowed) > > +{ > > + VLOG_DBG("node: %s, recompute (%s)", node->name, > > + forced ? "forced" : "triggered"); > > + > > + if (!allowed) { > > + VLOG_DBG("node: %s, recompute aborted", node->name); > > + return false; > > + } > > + > > + node->run(node); > > + VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > + return true; > > +} > > + > > +/* Return true if the node could be computed without triggerring a full > > + * recompute. > > + */ > > +static bool > > +engine_compute(struct engine_node *node, bool recompute_allowed) > > +{ > > + for (size_t i = 0; i < node->n_inputs; i++) { > > + /* If the input node data changed call its change handler. */ > > + if (node->inputs[i].node->changed) { > > + VLOG_DBG("node: %s, handle change for input %s", > > + node->name, node->inputs[i].node->name); > > + > > + /* If the input change can't be handled incrementally, run > > + * the node handler. > > + */ > > + if (!node->inputs[i].change_handler(node)) { > > Before calling change_handler, we need to check if its is NULL or not right ? > > Thanks > Numan Hi Numan, Thanks for reviewing this series. In this case it's safe to assume change_handler is not NULL because engine_compute() is called only if one of the node's inputs is in EN_UPDATED and node->inputs[i].change_handler != NULL. If the input change_handler is NULL we always call engine_recompute(...). Regards, Dumitru > > > + VLOG_DBG("node: %s, can't handle change for input %s, " > > + "fall back to recompute", > > + node->name, node->inputs[i].node->name); > > + if (!engine_recompute(node, false, recompute_allowed)) { > > + return false; > > + } > > + } > > + } > > + } > > + > > + return true; > > +} > > + > > +bool engine_run(struct engine_node *node, uint64_t run_id) > > { > > if (node->run_id == run_id) { > > + /* The node was already updated in this run (could be input for > > + * multiple other nodes). Stop processing. > > + */ > > return true; > > } > > - node->run_id = run_id; > > > > + /* Initialize the node for this run. */ > > + node->run_id = run_id; > > node->changed = false; > > + > > if (!node->n_inputs) { > > node->run(node); > > VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > @@ -150,59 +208,45 @@ engine_run(struct engine_node *node, uint64_t run_id) > > } > > > > bool need_compute = false; > > - bool need_recompute = false; > > > > if (engine_force_recompute) { > > - need_recompute = true; > > - } else { > > - for (size_t i = 0; i < node->n_inputs; i++) { > > - if (node->inputs[i].node->changed) { > > - need_compute = true; > > - if (!node->inputs[i].change_handler) { > > - need_recompute = true; > > - break; > > - } > > - } > > - } > > + return engine_recompute(node, true, !engine_abort_recompute); > > } > > > > - if (need_recompute) { > > - VLOG_DBG("node: %s, recompute (%s)", node->name, > > - engine_force_recompute ? "forced" : "triggered"); > > - if (engine_abort_recompute) { > > - VLOG_DBG("node: %s, recompute aborted", node->name); > > - return false; > > - } > > - node->run(node); > > - } else if (need_compute) { > > - for (size_t i = 0; i < node->n_inputs; i++) { > > - if (node->inputs[i].node->changed) { > > - VLOG_DBG("node: %s, handle change for input %s", > > - node->name, node->inputs[i].node->name); > > - if (!node->inputs[i].change_handler(node)) { > > - VLOG_DBG("node: %s, can't handle change for input %s, " > > - "fall back to recompute", > > - node->name, node->inputs[i].node->name); > > - if (engine_abort_recompute) { > > - VLOG_DBG("node: %s, recompute aborted", node->name); > > - return false; > > - } > > - node->run(node); > > - break; > > - } > > + /* If any of the inputs updated data but there is no change_handler, then > > + * recompute the current node too. > > + */ > > + for (size_t i = 0; i < node->n_inputs; i++) { > > + if (node->inputs[i].node->changed) { > > + need_compute = true; > > + > > + /* Trigger a recompute if we don't have a change handler. */ > > + if (!node->inputs[i].change_handler) { > > + return engine_recompute(node, false, !engine_abort_recompute); > > } > > } > > } > > > > + if (need_compute) { > > + /* If we couldn't compute the node we either aborted or triggered > > + * a full recompute. In any case, stop processing. > > + */ > > + return engine_compute(node, !engine_abort_recompute); > > + } > > + > > VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > return true; > > } > > > > bool > > -engine_need_run(struct engine_node *node) > > +engine_need_run(struct engine_node *node, uint64_t run_id) > > { > > size_t i; > > > > + if (node->run_id == run_id) { > > + return false; > > + } > > + > > if (!node->n_inputs) { > > node->run(node); > > VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); > > @@ -210,7 +254,7 @@ engine_need_run(struct engine_node *node) > > } > > > > for (i = 0; i < node->n_inputs; i++) { > > - if (engine_need_run(node->inputs[i].node)) { > > + if (engine_need_run(node->inputs[i].node, run_id)) { > > return true; > > } > > } > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > > index 3a69dc2..abd41b2 100644 > > --- a/lib/inc-proc-eng.h > > +++ b/lib/inc-proc-eng.h > > @@ -130,9 +130,9 @@ bool engine_run(struct engine_node *, uint64_t run_id); > > * terminates. */ > > void engine_cleanup(struct engine_node *); > > > > -/* Check if engine needs to run, i.e. any change to be processed. */ > > +/* Check if engine needs to run but didn't. */ > > bool > > -engine_need_run(struct engine_node *); > > +engine_need_run(struct engine_node *, uint64_t run_id); > > > > /* Get the input node with <name> for <node> */ > > struct engine_node * engine_get_input(const char *input_name, > > @@ -159,6 +159,9 @@ const struct engine_context * engine_get_context(void); > > > > void engine_set_context(const struct engine_context *); > > > > +/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ > > +bool engine_has_run(struct engine_node *node, uint64_t run_id); > > + > > struct ed_ovsdb_index { > > const char *name; > > struct ovsdb_idl_index *index; > > > > _______________________________________________ > > dev mailing list > > dev@openvswitch.org > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > > >
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c index 9ab98be..3922f3d 100644 --- a/controller/ovn-controller.c +++ b/controller/ovn-controller.c @@ -1942,7 +1942,6 @@ main(int argc, char *argv[]) &pending_pkt); uint64_t engine_run_id = 0; - uint64_t old_engine_run_id = 0; bool engine_run_done = true; unsigned int ovs_cond_seqno = UINT_MAX; @@ -1952,10 +1951,11 @@ main(int argc, char *argv[]) exiting = false; restart = false; while (!exiting) { + engine_run_id++; + update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl)); - old_engine_run_id = engine_run_id; struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop); unsigned int new_ovs_cond_seqno @@ -2047,12 +2047,12 @@ main(int argc, char *argv[]) if (engine_run_done) { engine_set_abort_recompute(true); engine_run_done = engine_run(&en_flow_output, - ++engine_run_id); + engine_run_id); } } else { engine_set_abort_recompute(false); engine_run_done = true; - engine_run(&en_flow_output, ++engine_run_id); + engine_run(&en_flow_output, engine_run_id); } } stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, @@ -2095,17 +2095,20 @@ main(int argc, char *argv[]) } } - if (old_engine_run_id == engine_run_id || !engine_run_done) { - if (!engine_run_done || engine_need_run(&en_flow_output)) { - VLOG_DBG("engine did not run, force recompute next time: " - "br_int %p, chassis %p", br_int, chassis); - engine_set_force_recompute(true); - poll_immediate_wake(); - } else { - VLOG_DBG("engine did not run, and it was not needed" - " either: br_int %p, chassis %p", - br_int, chassis); - } + if (engine_need_run(&en_flow_output, engine_run_id)) { + VLOG_DBG("engine did not run, force recompute next time: " + "br_int %p, chassis %p", br_int, chassis); + engine_set_force_recompute(true); + poll_immediate_wake(); + } else if (!engine_run_done) { + VLOG_DBG("engine was aborted, force recompute next time: " + "br_int %p, chassis %p", br_int, chassis); + engine_set_force_recompute(true); + poll_immediate_wake(); + } else if (!engine_has_run(&en_flow_output, engine_run_id)) { + VLOG_DBG("engine did not run, and it was not needed" + " either: br_int %p, chassis %p", + br_int, chassis); } else { engine_set_force_recompute(false); } diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c index 1064a08..8a085e2 100644 --- a/lib/inc-proc-eng.c +++ b/lib/inc-proc-eng.c @@ -129,14 +129,72 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, } bool -engine_run(struct engine_node *node, uint64_t run_id) +engine_has_run(struct engine_node *node, uint64_t run_id) +{ + return node->run_id == run_id; +} + +/* Do a full recompute (or at least try). If we're not allowed then + * mark the node as "aborted". + */ +static bool +engine_recompute(struct engine_node *node, bool forced, bool allowed) +{ + VLOG_DBG("node: %s, recompute (%s)", node->name, + forced ? "forced" : "triggered"); + + if (!allowed) { + VLOG_DBG("node: %s, recompute aborted", node->name); + return false; + } + + node->run(node); + VLOG_DBG("node: %s, changed: %d", node->name, node->changed); + return true; +} + +/* Return true if the node could be computed without triggerring a full + * recompute. + */ +static bool +engine_compute(struct engine_node *node, bool recompute_allowed) +{ + for (size_t i = 0; i < node->n_inputs; i++) { + /* If the input node data changed call its change handler. */ + if (node->inputs[i].node->changed) { + VLOG_DBG("node: %s, handle change for input %s", + node->name, node->inputs[i].node->name); + + /* If the input change can't be handled incrementally, run + * the node handler. + */ + if (!node->inputs[i].change_handler(node)) { + VLOG_DBG("node: %s, can't handle change for input %s, " + "fall back to recompute", + node->name, node->inputs[i].node->name); + if (!engine_recompute(node, false, recompute_allowed)) { + return false; + } + } + } + } + + return true; +} + +bool engine_run(struct engine_node *node, uint64_t run_id) { if (node->run_id == run_id) { + /* The node was already updated in this run (could be input for + * multiple other nodes). Stop processing. + */ return true; } - node->run_id = run_id; + /* Initialize the node for this run. */ + node->run_id = run_id; node->changed = false; + if (!node->n_inputs) { node->run(node); VLOG_DBG("node: %s, changed: %d", node->name, node->changed); @@ -150,59 +208,45 @@ engine_run(struct engine_node *node, uint64_t run_id) } bool need_compute = false; - bool need_recompute = false; if (engine_force_recompute) { - need_recompute = true; - } else { - for (size_t i = 0; i < node->n_inputs; i++) { - if (node->inputs[i].node->changed) { - need_compute = true; - if (!node->inputs[i].change_handler) { - need_recompute = true; - break; - } - } - } + return engine_recompute(node, true, !engine_abort_recompute); } - if (need_recompute) { - VLOG_DBG("node: %s, recompute (%s)", node->name, - engine_force_recompute ? "forced" : "triggered"); - if (engine_abort_recompute) { - VLOG_DBG("node: %s, recompute aborted", node->name); - return false; - } - node->run(node); - } else if (need_compute) { - for (size_t i = 0; i < node->n_inputs; i++) { - if (node->inputs[i].node->changed) { - VLOG_DBG("node: %s, handle change for input %s", - node->name, node->inputs[i].node->name); - if (!node->inputs[i].change_handler(node)) { - VLOG_DBG("node: %s, can't handle change for input %s, " - "fall back to recompute", - node->name, node->inputs[i].node->name); - if (engine_abort_recompute) { - VLOG_DBG("node: %s, recompute aborted", node->name); - return false; - } - node->run(node); - break; - } + /* If any of the inputs updated data but there is no change_handler, then + * recompute the current node too. + */ + for (size_t i = 0; i < node->n_inputs; i++) { + if (node->inputs[i].node->changed) { + need_compute = true; + + /* Trigger a recompute if we don't have a change handler. */ + if (!node->inputs[i].change_handler) { + return engine_recompute(node, false, !engine_abort_recompute); } } } + if (need_compute) { + /* If we couldn't compute the node we either aborted or triggered + * a full recompute. In any case, stop processing. + */ + return engine_compute(node, !engine_abort_recompute); + } + VLOG_DBG("node: %s, changed: %d", node->name, node->changed); return true; } bool -engine_need_run(struct engine_node *node) +engine_need_run(struct engine_node *node, uint64_t run_id) { size_t i; + if (node->run_id == run_id) { + return false; + } + if (!node->n_inputs) { node->run(node); VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); @@ -210,7 +254,7 @@ engine_need_run(struct engine_node *node) } for (i = 0; i < node->n_inputs; i++) { - if (engine_need_run(node->inputs[i].node)) { + if (engine_need_run(node->inputs[i].node, run_id)) { return true; } } diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h index 3a69dc2..abd41b2 100644 --- a/lib/inc-proc-eng.h +++ b/lib/inc-proc-eng.h @@ -130,9 +130,9 @@ bool engine_run(struct engine_node *, uint64_t run_id); * terminates. */ void engine_cleanup(struct engine_node *); -/* Check if engine needs to run, i.e. any change to be processed. */ +/* Check if engine needs to run but didn't. */ bool -engine_need_run(struct engine_node *); +engine_need_run(struct engine_node *, uint64_t run_id); /* Get the input node with <name> for <node> */ struct engine_node * engine_get_input(const char *input_name, @@ -159,6 +159,9 @@ const struct engine_context * engine_get_context(void); void engine_set_context(const struct engine_context *); +/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ +bool engine_has_run(struct engine_node *node, uint64_t run_id); + struct ed_ovsdb_index { const char *name; struct ovsdb_idl_index *index;
This commit simplifies the logic of calling engine_run and engine_need_run in order to reduce the number of external variables required to track the result of the last engine execution. The engine code is also refactored a bit and the engine_run() function is split in different functions that handle computing/recomputing a node. Signed-off-by: Dumitru Ceara <dceara@redhat.com> --- controller/ovn-controller.c | 33 ++++++----- lib/inc-proc-eng.c | 124 +++++++++++++++++++++++++++++-------------- lib/inc-proc-eng.h | 7 ++ 3 files changed, 107 insertions(+), 57 deletions(-)