@@ -1942,7 +1942,6 @@ main(int argc, char *argv[])
&pending_pkt);
uint64_t engine_run_id = 0;
- uint64_t old_engine_run_id = 0;
bool engine_run_done = true;
unsigned int ovs_cond_seqno = UINT_MAX;
@@ -1952,10 +1951,11 @@ main(int argc, char *argv[])
exiting = false;
restart = false;
while (!exiting) {
+ engine_run_id++;
+
update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl));
- old_engine_run_id = engine_run_id;
struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
unsigned int new_ovs_cond_seqno
@@ -2047,12 +2047,12 @@ main(int argc, char *argv[])
if (engine_run_done) {
engine_set_abort_recompute(true);
engine_run_done = engine_run(&en_flow_output,
- ++engine_run_id);
+ engine_run_id);
}
} else {
engine_set_abort_recompute(false);
engine_run_done = true;
- engine_run(&en_flow_output, ++engine_run_id);
+ engine_run(&en_flow_output, engine_run_id);
}
}
stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
@@ -2095,17 +2095,20 @@ main(int argc, char *argv[])
}
}
- if (old_engine_run_id == engine_run_id || !engine_run_done) {
- if (!engine_run_done || engine_need_run(&en_flow_output)) {
- VLOG_DBG("engine did not run, force recompute next time: "
- "br_int %p, chassis %p", br_int, chassis);
- engine_set_force_recompute(true);
- poll_immediate_wake();
- } else {
- VLOG_DBG("engine did not run, and it was not needed"
- " either: br_int %p, chassis %p",
- br_int, chassis);
- }
+ if (engine_need_run(&en_flow_output, engine_run_id)) {
+ VLOG_DBG("engine did not run, force recompute next time: "
+ "br_int %p, chassis %p", br_int, chassis);
+ engine_set_force_recompute(true);
+ poll_immediate_wake();
+ } else if (!engine_run_done) {
+ VLOG_DBG("engine was aborted, force recompute next time: "
+ "br_int %p, chassis %p", br_int, chassis);
+ engine_set_force_recompute(true);
+ poll_immediate_wake();
+ } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
+ VLOG_DBG("engine did not run, and it was not needed"
+ " either: br_int %p, chassis %p",
+ br_int, chassis);
} else {
engine_set_force_recompute(false);
}
@@ -129,14 +129,72 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
}
bool
-engine_run(struct engine_node *node, uint64_t run_id)
+engine_has_run(struct engine_node *node, uint64_t run_id)
+{
+ return node->run_id == run_id;
+}
+
+/* Do a full recompute (or at least try). If we're not allowed then
+ * mark the node as "aborted".
+ */
+static bool
+engine_recompute(struct engine_node *node, bool forced, bool allowed)
+{
+ VLOG_DBG("node: %s, recompute (%s)", node->name,
+ forced ? "forced" : "triggered");
+
+ if (!allowed) {
+ VLOG_DBG("node: %s, recompute aborted", node->name);
+ return false;
+ }
+
+ node->run(node);
+ VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
+ return true;
+}
+
+/* Return true if the node could be computed without triggerring a full
+ * recompute.
+ */
+static bool
+engine_compute(struct engine_node *node, bool recompute_allowed)
+{
+ for (size_t i = 0; i < node->n_inputs; i++) {
+ /* If the input node data changed call its change handler. */
+ if (node->inputs[i].node->changed) {
+ VLOG_DBG("node: %s, handle change for input %s",
+ node->name, node->inputs[i].node->name);
+
+ /* If the input change can't be handled incrementally, run
+ * the node handler.
+ */
+ if (!node->inputs[i].change_handler(node)) {
+ VLOG_DBG("node: %s, can't handle change for input %s, "
+ "fall back to recompute",
+ node->name, node->inputs[i].node->name);
+ if (!engine_recompute(node, false, recompute_allowed)) {
+ return false;
+ }
+ }
+ }
+ }
+
+ return true;
+}
+
+bool engine_run(struct engine_node *node, uint64_t run_id)
{
if (node->run_id == run_id) {
+ /* The node was already updated in this run (could be input for
+ * multiple other nodes). Stop processing.
+ */
return true;
}
- node->run_id = run_id;
+ /* Initialize the node for this run. */
+ node->run_id = run_id;
node->changed = false;
+
if (!node->n_inputs) {
node->run(node);
VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
@@ -150,59 +208,45 @@ engine_run(struct engine_node *node, uint64_t run_id)
}
bool need_compute = false;
- bool need_recompute = false;
if (engine_force_recompute) {
- need_recompute = true;
- } else {
- for (size_t i = 0; i < node->n_inputs; i++) {
- if (node->inputs[i].node->changed) {
- need_compute = true;
- if (!node->inputs[i].change_handler) {
- need_recompute = true;
- break;
- }
- }
- }
+ return engine_recompute(node, true, !engine_abort_recompute);
}
- if (need_recompute) {
- VLOG_DBG("node: %s, recompute (%s)", node->name,
- engine_force_recompute ? "forced" : "triggered");
- if (engine_abort_recompute) {
- VLOG_DBG("node: %s, recompute aborted", node->name);
- return false;
- }
- node->run(node);
- } else if (need_compute) {
- for (size_t i = 0; i < node->n_inputs; i++) {
- if (node->inputs[i].node->changed) {
- VLOG_DBG("node: %s, handle change for input %s",
- node->name, node->inputs[i].node->name);
- if (!node->inputs[i].change_handler(node)) {
- VLOG_DBG("node: %s, can't handle change for input %s, "
- "fall back to recompute",
- node->name, node->inputs[i].node->name);
- if (engine_abort_recompute) {
- VLOG_DBG("node: %s, recompute aborted", node->name);
- return false;
- }
- node->run(node);
- break;
- }
+ /* If any of the inputs updated data but there is no change_handler, then
+ * recompute the current node too.
+ */
+ for (size_t i = 0; i < node->n_inputs; i++) {
+ if (node->inputs[i].node->changed) {
+ need_compute = true;
+
+ /* Trigger a recompute if we don't have a change handler. */
+ if (!node->inputs[i].change_handler) {
+ return engine_recompute(node, false, !engine_abort_recompute);
}
}
}
+ if (need_compute) {
+ /* If we couldn't compute the node we either aborted or triggered
+ * a full recompute. In any case, stop processing.
+ */
+ return engine_compute(node, !engine_abort_recompute);
+ }
+
VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
return true;
}
bool
-engine_need_run(struct engine_node *node)
+engine_need_run(struct engine_node *node, uint64_t run_id)
{
size_t i;
+ if (node->run_id == run_id) {
+ return false;
+ }
+
if (!node->n_inputs) {
node->run(node);
VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
@@ -210,7 +254,7 @@ engine_need_run(struct engine_node *node)
}
for (i = 0; i < node->n_inputs; i++) {
- if (engine_need_run(node->inputs[i].node)) {
+ if (engine_need_run(node->inputs[i].node, run_id)) {
return true;
}
}
@@ -130,9 +130,9 @@ bool engine_run(struct engine_node *, uint64_t run_id);
* terminates. */
void engine_cleanup(struct engine_node *);
-/* Check if engine needs to run, i.e. any change to be processed. */
+/* Check if engine needs to run but didn't. */
bool
-engine_need_run(struct engine_node *);
+engine_need_run(struct engine_node *, uint64_t run_id);
/* Get the input node with <name> for <node> */
struct engine_node * engine_get_input(const char *input_name,
@@ -159,6 +159,9 @@ const struct engine_context * engine_get_context(void);
void engine_set_context(const struct engine_context *);
+/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
+bool engine_has_run(struct engine_node *node, uint64_t run_id);
+
struct ed_ovsdb_index {
const char *name;
struct ovsdb_idl_index *index;
This commit simplifies the logic of calling engine_run and engine_need_run in order to reduce the number of external variables required to track the result of the last engine execution. The engine code is also refactored a bit and the engine_run() function is split in different functions that handle computing/recomputing a node. Signed-off-by: Dumitru Ceara <dceara@redhat.com> --- controller/ovn-controller.c | 33 ++++++----- lib/inc-proc-eng.c | 124 +++++++++++++++++++++++++++++-------------- lib/inc-proc-eng.h | 7 ++ 3 files changed, 107 insertions(+), 57 deletions(-)