Message ID | 20191114170524.30391.61326.stgit@dceara.remote.csb |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,v3,ovn,1/4] ovn-controller: Refactor I-P engine_run() tracking. | expand |
Thanks for this series Dumitru. This is really helpful. Few comments below. On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote: > > This commit transforms the 'changed' field in struct engine_node in a > 'state' field. Possible node states are: > - "Stale": data in the node is not up to date with the DB. > - "Updated": data in the node is valid but was updated during > the last run of the engine. > - "Valid": data in the node is valid and didn't change during > the last run of the engine. > - "Aborted": during the last run, processing was aborted for > this node. > > This commit also further refactors the I-P engine: > - instead of recursively performing all the engine processing a > preprocessing stage is added (engine_get_nodes()) before the main processing > loop is executed in order to topologically sort nodes in the engine such > that all inputs of a given node appear in the sorted array before the node > itself. This simplifies a bit the code in engine_run(). > - remove the need for using an engine_run_id by using the newly added states. > > Signed-off-by: Dumitru Ceara <dceara@redhat.com> > --- After applying this patch I notice that adding a logical switch or logical router is resulting the engine_run() to be called twice i.e the function engine_need_run() always returns true when I run "ovn-nbctl ls-add sw0" and engine_run() is called again. If you enable debug and add a logical switch you will see [1] to be hit all the time which is not the case before this patch. Thanks Numan > controller/ovn-controller.c | 88 ++++++++++------- > lib/inc-proc-eng.c | 218 ++++++++++++++++++++++++++++++++----------- > lib/inc-proc-eng.h | 74 +++++++++++---- > 3 files changed, 267 insertions(+), 113 deletions(-) > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > index 3922f3d..4f8ceae 100644 > --- a/controller/ovn-controller.c > +++ b/controller/ovn-controller.c > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node) > (struct ed_type_ofctrl_is_connected *)node->data; > if (data->connected != ofctrl_is_connected()) { > data->connected = !data->connected; > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return; > } > - node->changed = false; > + engine_set_node_state(node, EN_VALID); > } > > struct ed_type_addr_sets { > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node) > addr_sets_init(as_table, &as->addr_sets); > > as->change_tracked = false; > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > } > > static bool > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node) > addr_sets_update(as_table, &as->addr_sets, &as->new, > &as->deleted, &as->updated); > > - node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) > - || !sset_is_empty(&as->updated); > + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || > + !sset_is_empty(&as->updated)) { > + engine_set_node_state(node, EN_UPDATED); > + } else { > + engine_set_node_state(node, EN_VALID); > + } > > as->change_tracked = true; > - node->changed = true; > return true; > } > > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node) > port_groups_init(pg_table, &pg->port_groups); > > pg->change_tracked = false; > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > } > > static bool > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node) > port_groups_update(pg_table, &pg->port_groups, &pg->new, > &pg->deleted, &pg->updated); > > - node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) > - || !sset_is_empty(&pg->updated); > + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || > + !sset_is_empty(&pg->updated)) { > + engine_set_node_state(node, EN_UPDATED); > + } else { > + engine_set_node_state(node, EN_VALID); > + } > > pg->change_tracked = true; > - node->changed = true; > return true; > } > > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node) > update_ct_zones(local_lports, local_datapaths, ct_zones, > ct_zone_bitmap, pending_ct_zones); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > } > > static bool > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node) > enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); > if (data->mff_ovn_geneve != mff_ovn_geneve) { > data->mff_ovn_geneve = mff_ovn_geneve; > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return; > } > - node->changed = false; > + engine_set_node_state(node, EN_VALID); > } > > struct ed_type_flow_output { > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node) > active_tunnels, > flow_table); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > } > > static bool > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node) > flow_table, group_table, meter_table, lfrr, > conj_id_ofs); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return handled; > } > > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node) > lflow_handle_changed_neighbors(sbrec_port_binding_by_name, > mac_binding_table, flow_table); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return true; > } > > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node) > chassis, ct_zones, local_datapaths, > active_tunnels, flow_table); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return true; > } > > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node) > mff_ovn_geneve, chassis, ct_zones, local_datapaths, > flow_table); > > - node->changed = true; > + engine_set_node_state(node, EN_UPDATED); > return true; > > } > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > conj_id_ofs, &changed)) { > return false; > } > - node->changed = changed || node->changed; > + if (changed) { > + engine_set_node_state(node, EN_UPDATED); > + } > } > SSET_FOR_EACH (ref_name, updated) { > if (!lflow_handle_changed_ref(ref_type, ref_name, > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > conj_id_ofs, &changed)) { > return false; > } > - node->changed = changed || node->changed; > + if (changed) { > + engine_set_node_state(node, EN_UPDATED); > + } > } > SSET_FOR_EACH (ref_name, new) { > if (!lflow_handle_changed_ref(ref_type, ref_name, > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > conj_id_ofs, &changed)) { > return false; > } > - node->changed = changed || node->changed; > + if (changed) { > + engine_set_node_state(node, EN_UPDATED); > + } > } > > return true; > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[]) > engine_add_input(&en_runtime_data, &en_sb_port_binding, > runtime_data_sb_port_binding_handler); > > - engine_init(&en_flow_output); > + /* Get the sorted engine nodes to be used for every engine run. */ > + size_t en_count = 0; > + struct engine_node **en_nodes = engine_get_nodes(&en_flow_output, > + &en_count); > + engine_init(en_nodes, en_count); > > ofctrl_init(&ed_flow_output.group_table, > &ed_flow_output.meter_table, > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[]) > unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt, > &pending_pkt); > > - uint64_t engine_run_id = 0; > - bool engine_run_done = true; > - > unsigned int ovs_cond_seqno = UINT_MAX; > unsigned int ovnsb_cond_seqno = UINT_MAX; > > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[]) > exiting = false; > restart = false; > while (!exiting) { > - engine_run_id++; > + engine_init_run(en_nodes, en_count, &en_flow_output); > > update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); > update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[]) > * this round of engine_run and continue processing > * acculated changes incrementally later when > * ofctrl_can_put() returns true. */ > - if (engine_run_done) { > + if (!engine_aborted(&en_flow_output)) { > engine_set_abort_recompute(true); > - engine_run_done = engine_run(&en_flow_output, > - engine_run_id); > + engine_run(en_nodes, en_count); > } > } else { > engine_set_abort_recompute(false); > - engine_run_done = true; > - engine_run(&en_flow_output, engine_run_id); > + engine_run(en_nodes, en_count); > } > } > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[]) > sbrec_meter_table_get(ovnsb_idl_loop.idl), > get_nb_cfg(sbrec_sb_global_table_get( > ovnsb_idl_loop.idl)), > - en_flow_output.changed); > + engine_node_changed(&en_flow_output)); > pinctrl_run(ovnsb_idl_txn, > sbrec_datapath_binding_by_key, > sbrec_port_binding_by_datapath, > @@ -2087,7 +2098,7 @@ main(int argc, char *argv[]) > &ed_runtime_data.local_datapaths, > &ed_runtime_data.active_tunnels); > > - if (en_runtime_data.changed) { > + if (engine_node_changed(&en_runtime_data)) { > update_sb_monitors(ovnsb_idl_loop.idl, chassis, > &ed_runtime_data.local_lports, > &ed_runtime_data.local_datapaths); > @@ -2095,17 +2106,17 @@ main(int argc, char *argv[]) > } > > } > - if (engine_need_run(&en_flow_output, engine_run_id)) { > + if (engine_need_run(en_nodes, en_count)) { > VLOG_DBG("engine did not run, force recompute next time: " > "br_int %p, chassis %p", br_int, chassis); > engine_set_force_recompute(true); > poll_immediate_wake(); > - } else if (!engine_run_done) { > + } else if (engine_aborted(&en_flow_output)) { > VLOG_DBG("engine was aborted, force recompute next time: " > "br_int %p, chassis %p", br_int, chassis); > engine_set_force_recompute(true); > poll_immediate_wake(); > - } else if (!engine_has_run(&en_flow_output, engine_run_id)) { > + } else if (!engine_has_run(&en_flow_output)) { > VLOG_DBG("engine did not run, and it was not needed" > " either: br_int %p, chassis %p", > br_int, chassis); > @@ -2133,8 +2144,7 @@ main(int argc, char *argv[]) > } > } else { > VLOG_DBG("Pending_pkt conn but br_int %p or chassis " > - "%p not ready. run-id: %"PRIu64, br_int, > - chassis, engine_run_id); > + "%p not ready.", br_int, chassis); > unixctl_command_reply_error(pending_pkt.conn, > "ovn-controller not ready."); > } > @@ -2183,7 +2193,7 @@ main(int argc, char *argv[]) > } > > engine_set_context(NULL); > - engine_cleanup(&en_flow_output); > + engine_cleanup(en_nodes, en_count); > > /* It's time to exit. Clean up the databases if we are not restarting */ > if (!restart) { > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > index 8a085e2..b438a15 100644 > --- a/lib/inc-proc-eng.c > +++ b/lib/inc-proc-eng.c > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false; > static bool engine_abort_recompute = false; > static const struct engine_context *engine_context; > > +static const char *engine_node_state_name[EN_STATE_MAX] = { > + [EN_STALE] = "Stale", > + [EN_UPDATED] = "Updated", > + [EN_VALID] = "Valid", > + [EN_ABORTED] = "Aborted", > +}; > + > void > engine_set_force_recompute(bool val) > { > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx) > engine_context = ctx; > } > > -void > -engine_init(struct engine_node *node) > +/* Builds the topologically sorted 'sorted_nodes' array starting from > + * 'node'. > + */ > +static struct engine_node ** > +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes, > + size_t *n_count, size_t *n_size) > { > + /* It's not so efficient to walk the array of already sorted nodes but > + * we know that sorting is done only once at startup so it's ok for now. > + */ > + for (size_t i = 0; i < *n_count; i++) { > + if (sorted_nodes[i] == node) { > + return sorted_nodes; > + } > + } > + > for (size_t i = 0; i < node->n_inputs; i++) { > - engine_init(node->inputs[i].node); > + sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes, > + n_count, n_size); > } > - if (node->init) { > - node->init(node); > + if (*n_count == *n_size) { > + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes); > } > + sorted_nodes[(*n_count)] = node; > + (*n_count)++; > + return sorted_nodes; > +} > + > +struct engine_node ** > +engine_get_nodes(struct engine_node *root_node, size_t *n_count) > +{ > + size_t n_size = 0; > + > + *n_count = 0; > + return engine_topo_sort(root_node, NULL, n_count, &n_size); > } > > void > -engine_cleanup(struct engine_node *node) > +engine_init(struct engine_node **nodes, size_t n_count) > { > - for (size_t i = 0; i < node->n_inputs; i++) { > - engine_cleanup(node->inputs[i].node); > + for (size_t i = 0; i < n_count; i++) { > + if (nodes[i]->init) { > + nodes[i]->init(nodes[i]); > + } > } > - if (node->cleanup) { > - node->cleanup(node); > +} > + > +void > +engine_cleanup(struct engine_node **nodes, size_t n_count) > +{ > + for (size_t i = 0; i < n_count; i++) { > + if (nodes[i]->cleanup) { > + nodes[i]->cleanup(nodes[i]); > + } > } > + free(nodes); > } > > struct engine_node * > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, > ed->n_indexes ++; > } > > +void > +engine_set_node_state_at(struct engine_node *node, > + enum engine_node_state state, > + const char *where) > +{ > + if (node->state == state) { > + return; > + } > + > + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", > + where, node->name, > + engine_node_state_name[node->state], > + engine_node_state_name[state]); > + > + node->state = state; > +} > + > +static bool > +engine_node_valid(struct engine_node *node) > +{ > + return (node->state == EN_UPDATED || node->state == EN_VALID); > +} > + > +bool > +engine_node_changed(struct engine_node *node) > +{ > + return node->state == EN_UPDATED; > +} > + > +bool > +engine_has_run(struct engine_node *root_node) > +{ > + return root_node->state != EN_STALE; > +} > + > bool > -engine_has_run(struct engine_node *node, uint64_t run_id) > +engine_aborted(struct engine_node *node) > { > - return node->run_id == run_id; > + return node->state == EN_ABORTED; > +} > + > +void > +engine_init_run(struct engine_node **nodes, size_t n_count, > + struct engine_node *root_node) > +{ > + /* No need to reinitialize if last run didn't happen. */ > + if (!engine_has_run(root_node)) { > + return; > + } > + > + VLOG_DBG("Initializing new run"); > + for (size_t i = 0; i < n_count; i++) { > + engine_set_node_state(nodes[i], EN_STALE); > + } > } > > /* Do a full recompute (or at least try). If we're not allowed then > * mark the node as "aborted". > */ > -static bool > +static void > engine_recompute(struct engine_node *node, bool forced, bool allowed) > { > VLOG_DBG("node: %s, recompute (%s)", node->name, > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed) > > if (!allowed) { > VLOG_DBG("node: %s, recompute aborted", node->name); > - return false; > + engine_set_node_state(node, EN_ABORTED); > + return; > } > > + /* Run the node handler which might change state. */ > node->run(node); > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > - return true; > } > > /* Return true if the node could be computed without triggerring a full > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed) > { > for (size_t i = 0; i < node->n_inputs; i++) { > /* If the input node data changed call its change handler. */ > - if (node->inputs[i].node->changed) { > + if (node->inputs[i].node->state == EN_UPDATED) { > VLOG_DBG("node: %s, handle change for input %s", > node->name, node->inputs[i].node->name); > > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed) > VLOG_DBG("node: %s, can't handle change for input %s, " > "fall back to recompute", > node->name, node->inputs[i].node->name); > - if (!engine_recompute(node, false, recompute_allowed)) { > + engine_recompute(node, false, recompute_allowed); > + if (engine_aborted(node)) { > return false; > } > } > } > } > - > return true; > } > > -bool engine_run(struct engine_node *node, uint64_t run_id) > +static void > +engine_run_node(struct engine_node *node) > { > - if (node->run_id == run_id) { > - /* The node was already updated in this run (could be input for > - * multiple other nodes). Stop processing. > - */ > - return true; > - } > - > - /* Initialize the node for this run. */ > - node->run_id = run_id; > - node->changed = false; > - > if (!node->n_inputs) { > + /* Run the node handler which might change state. */ > node->run(node); > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > - return true; > + return; > } > > + bool input_stale = false; > for (size_t i = 0; i < node->n_inputs; i++) { > - if (!engine_run(node->inputs[i].node, run_id)) { > - return false; > + if (!engine_node_valid(node->inputs[i].node)) { > + /* If the input node aborted computation, move to EN_ABORTED. > + * This will be propagated to following nodes. > + */ > + if (engine_aborted(node->inputs[i].node)) { > + engine_set_node_state(node, EN_ABORTED); > + } > + > + input_stale = true; > } > } > > - bool need_compute = false; > + /* If at least one input is stale, don't change state. */ > + if (input_stale) { > + return; > + } > > if (engine_force_recompute) { > - return engine_recompute(node, true, !engine_abort_recompute); > + engine_recompute(node, true, !engine_abort_recompute); > + return; > } > > /* If any of the inputs updated data but there is no change_handler, then > * recompute the current node too. > */ > + bool need_compute = false; > for (size_t i = 0; i < node->n_inputs; i++) { > - if (node->inputs[i].node->changed) { > + if (node->inputs[i].node->state == EN_UPDATED) { > need_compute = true; > > /* Trigger a recompute if we don't have a change handler. */ > if (!node->inputs[i].change_handler) { > - return engine_recompute(node, false, !engine_abort_recompute); > + engine_recompute(node, false, !engine_abort_recompute); > + return; > } > } > } > @@ -231,33 +328,42 @@ bool engine_run(struct engine_node *node, uint64_t run_id) > /* If we couldn't compute the node we either aborted or triggered > * a full recompute. In any case, stop processing. > */ > - return engine_compute(node, !engine_abort_recompute); > + if (!engine_compute(node, !engine_abort_recompute)) { > + return; > + } > } > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > - return true; > + /* If we reached this point, either the node was updated or its state is > + * still valid. > + */ > + if (!engine_node_changed(node)) { > + engine_set_node_state(node, EN_VALID); > + } > } > > -bool > -engine_need_run(struct engine_node *node, uint64_t run_id) > +void > +engine_run(struct engine_node **nodes, size_t n_count) > { > - size_t i; > - > - if (node->run_id == run_id) { > - return false; > + for (size_t i = 0; i < n_count; i++) { > + engine_run_node(nodes[i]); > } > +} > > - if (!node->n_inputs) { > - node->run(node); > - VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); > - return node->changed; > - } > +bool > +engine_need_run(struct engine_node **nodes, size_t n_count) > +{ > + for (size_t i = 0; i < n_count; i++) { > + /* Check only leaf nodes. */ > + if (nodes[i]->n_inputs) { > + continue; > + } > > - for (i = 0; i < node->n_inputs; i++) { > - if (engine_need_run(node->inputs[i].node, run_id)) { > + nodes[i]->run(nodes[i]); > + VLOG_DBG("input node: %s, state: %s", nodes[i]->name, > + engine_node_state_name[nodes[i]->state]); > + if (nodes[i]->state == EN_UPDATED) { > return true; > } > } > - > return false; > } > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > index abd41b2..9a35f1f 100644 > --- a/lib/inc-proc-eng.h > +++ b/lib/inc-proc-eng.h > @@ -82,10 +82,21 @@ struct engine_node_input { > bool (*change_handler)(struct engine_node *node); > }; > > -struct engine_node { > - /* A unique id to distinguish each iteration of the engine_run(). */ > - uint64_t run_id; > +enum engine_node_state { > + EN_STALE, /* Data in the node is not up to date with the DB. */ > + EN_UPDATED, /* Data in the node is valid but was updated during the > + * last run. > + */ > + EN_VALID, /* Data in the node is valid and didn't change during the > + * last run. > + */ > + EN_ABORTED, /* During the last run, processing was aborted for > + * this node. > + */ > + EN_STATE_MAX, > +}; > > +struct engine_node { > /* A unique name for each node. */ > char *name; > > @@ -102,8 +113,8 @@ struct engine_node { > * node. */ > void *data; > > - /* Whether the data changed in the last engine run. */ > - bool changed; > + /* State of the node after the last engine run. */ > + enum engine_node_state state; > > /* Method to initialize data. It may be NULL. */ > void (*init)(struct engine_node *); > @@ -116,23 +127,35 @@ struct engine_node { > void (*run)(struct engine_node *); > }; > > -/* Initialize the data for the engine nodes recursively. It calls each node's > +/* Return the array of topologically sorted nodes when starting from > + * 'root_node'. Stores the number of nodes in 'n_count'. > + * It should be called before the main loop. > + */ > +struct engine_node **engine_get_nodes(struct engine_node *root_node, > + size_t *n_count); > + > +/* Initialize the data for the engine nodes. It calls each node's > * init() method if not NULL. It should be called before the main loop. */ > -void engine_init(struct engine_node *); > +void engine_init(struct engine_node **nodes, size_t n_count); > + > +/* Initialize the engine nodes for a new run. It should be called in the > + * main processing loop before every potential engine_run(). > + */ > +void engine_init_run(struct engine_node **nodes, size_t n_count, > + struct engine_node *root_node); > > /* Execute the processing recursively, which should be called in the main > - * loop. Returns true if the execution is compelte, false if it is aborted, > - * which could happen when engine_abort_recompute is set. */ > -bool engine_run(struct engine_node *, uint64_t run_id); > + * loop. Updates the engine node's states accordingly. > + */ > +void engine_run(struct engine_node **nodes, size_t n_count); > > -/* Clean up the data for the engine nodes recursively. It calls each node's > +/* Clean up the data for the engine nodes. It calls each node's > * cleanup() method if not NULL. It should be called before the program > * terminates. */ > -void engine_cleanup(struct engine_node *); > +void engine_cleanup(struct engine_node **nodes, size_t n_count); > > /* Check if engine needs to run but didn't. */ > -bool > -engine_need_run(struct engine_node *, uint64_t run_id); > +bool engine_need_run(struct engine_node **nodes, size_t n_count); > > /* Get the input node with <name> for <node> */ > struct engine_node * engine_get_input(const char *input_name, > @@ -159,8 +182,22 @@ const struct engine_context * engine_get_context(void); > > void engine_set_context(const struct engine_context *); > > -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ > -bool engine_has_run(struct engine_node *node, uint64_t run_id); > +void engine_set_node_state_at(struct engine_node *node, > + enum engine_node_state state, > + const char *where); > + > +/* Return true if during the last iteration the node's data was updated. */ > +bool engine_node_changed(struct engine_node *node); > + > +/* Return true if the engine has run for 'node' in the last iteration. */ > +bool engine_has_run(struct engine_node *node); > + > +/* Returns true if during the last engine run we had to abort processing. */ > +bool engine_aborted(struct engine_node *node); > + > +/* Set the state of the node and log changes. */ > +#define engine_set_node_state(node, state) \ > + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) > > struct ed_ovsdb_index { > const char *name; > @@ -187,6 +224,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name, > struct engine_node en_##NAME = { \ > .name = NAME_STR, \ > .data = &ed_##NAME, \ > + .state = EN_STALE, \ > .init = en_##NAME##_init, \ > .run = en_##NAME##_run, \ > .cleanup = en_##NAME##_cleanup, \ > @@ -201,10 +239,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \ > const struct DB_NAME##rec_##TBL_NAME##_table *table = \ > EN_OVSDB_GET(node); \ > if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ > - node->changed = true; \ > + engine_set_node_state(node, EN_UPDATED); \ > return; \ > } \ > - node->changed = false; \ > + engine_set_node_state(node, EN_VALID); \ > } \ > static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \ > = NULL; \ > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev >
On Mon, Nov 18, 2019 at 8:38 AM Numan Siddique <numans@ovn.org> wrote: > > Thanks for this series Dumitru. This is really helpful. > > Few comments below. > > > On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote: > > > > This commit transforms the 'changed' field in struct engine_node in a > > 'state' field. Possible node states are: > > - "Stale": data in the node is not up to date with the DB. > > - "Updated": data in the node is valid but was updated during > > the last run of the engine. > > - "Valid": data in the node is valid and didn't change during > > the last run of the engine. > > - "Aborted": during the last run, processing was aborted for > > this node. > > > > This commit also further refactors the I-P engine: > > - instead of recursively performing all the engine processing a > > preprocessing stage is added (engine_get_nodes()) before the main processing > > loop is executed in order to topologically sort nodes in the engine such > > that all inputs of a given node appear in the sorted array before the node > > itself. This simplifies a bit the code in engine_run(). > > - remove the need for using an engine_run_id by using the newly added states. > > > > Signed-off-by: Dumitru Ceara <dceara@redhat.com> > > --- > > After applying this patch I notice that adding a logical switch or > logical router is resulting > the engine_run() to be called twice i.e the function > engine_need_run() always returns true when I > run "ovn-nbctl ls-add sw0" and engine_run() is called again. > > If you enable debug and add a logical switch you will see [1] to be > hit all the time which is not the case > before this patch. > > Thanks > Numan Nice catch, sorry about that. I forgot to add a check to see if the engine was run on the root node (en_flow_output) in the iteration. I'll fix it in v4. Thanks, Dumitru > > > > controller/ovn-controller.c | 88 ++++++++++------- > > lib/inc-proc-eng.c | 218 ++++++++++++++++++++++++++++++++----------- > > lib/inc-proc-eng.h | 74 +++++++++++---- > > 3 files changed, 267 insertions(+), 113 deletions(-) > > > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > > index 3922f3d..4f8ceae 100644 > > --- a/controller/ovn-controller.c > > +++ b/controller/ovn-controller.c > > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node) > > (struct ed_type_ofctrl_is_connected *)node->data; > > if (data->connected != ofctrl_is_connected()) { > > data->connected = !data->connected; > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return; > > } > > - node->changed = false; > > + engine_set_node_state(node, EN_VALID); > > } > > > > struct ed_type_addr_sets { > > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node) > > addr_sets_init(as_table, &as->addr_sets); > > > > as->change_tracked = false; > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > } > > > > static bool > > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node) > > addr_sets_update(as_table, &as->addr_sets, &as->new, > > &as->deleted, &as->updated); > > > > - node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) > > - || !sset_is_empty(&as->updated); > > + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || > > + !sset_is_empty(&as->updated)) { > > + engine_set_node_state(node, EN_UPDATED); > > + } else { > > + engine_set_node_state(node, EN_VALID); > > + } > > > > as->change_tracked = true; > > - node->changed = true; > > return true; > > } > > > > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node) > > port_groups_init(pg_table, &pg->port_groups); > > > > pg->change_tracked = false; > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > } > > > > static bool > > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node) > > port_groups_update(pg_table, &pg->port_groups, &pg->new, > > &pg->deleted, &pg->updated); > > > > - node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) > > - || !sset_is_empty(&pg->updated); > > + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || > > + !sset_is_empty(&pg->updated)) { > > + engine_set_node_state(node, EN_UPDATED); > > + } else { > > + engine_set_node_state(node, EN_VALID); > > + } > > > > pg->change_tracked = true; > > - node->changed = true; > > return true; > > } > > > > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node) > > update_ct_zones(local_lports, local_datapaths, ct_zones, > > ct_zone_bitmap, pending_ct_zones); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > } > > > > static bool > > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node) > > enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); > > if (data->mff_ovn_geneve != mff_ovn_geneve) { > > data->mff_ovn_geneve = mff_ovn_geneve; > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return; > > } > > - node->changed = false; > > + engine_set_node_state(node, EN_VALID); > > } > > > > struct ed_type_flow_output { > > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node) > > active_tunnels, > > flow_table); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > } > > > > static bool > > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node) > > flow_table, group_table, meter_table, lfrr, > > conj_id_ofs); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return handled; > > } > > > > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node) > > lflow_handle_changed_neighbors(sbrec_port_binding_by_name, > > mac_binding_table, flow_table); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return true; > > } > > > > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node) > > chassis, ct_zones, local_datapaths, > > active_tunnels, flow_table); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return true; > > } > > > > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node) > > mff_ovn_geneve, chassis, ct_zones, local_datapaths, > > flow_table); > > > > - node->changed = true; > > + engine_set_node_state(node, EN_UPDATED); > > return true; > > > > } > > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > > conj_id_ofs, &changed)) { > > return false; > > } > > - node->changed = changed || node->changed; > > + if (changed) { > > + engine_set_node_state(node, EN_UPDATED); > > + } > > } > > SSET_FOR_EACH (ref_name, updated) { > > if (!lflow_handle_changed_ref(ref_type, ref_name, > > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > > conj_id_ofs, &changed)) { > > return false; > > } > > - node->changed = changed || node->changed; > > + if (changed) { > > + engine_set_node_state(node, EN_UPDATED); > > + } > > } > > SSET_FOR_EACH (ref_name, new) { > > if (!lflow_handle_changed_ref(ref_type, ref_name, > > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, > > conj_id_ofs, &changed)) { > > return false; > > } > > - node->changed = changed || node->changed; > > + if (changed) { > > + engine_set_node_state(node, EN_UPDATED); > > + } > > } > > > > return true; > > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[]) > > engine_add_input(&en_runtime_data, &en_sb_port_binding, > > runtime_data_sb_port_binding_handler); > > > > - engine_init(&en_flow_output); > > + /* Get the sorted engine nodes to be used for every engine run. */ > > + size_t en_count = 0; > > + struct engine_node **en_nodes = engine_get_nodes(&en_flow_output, > > + &en_count); > > + engine_init(en_nodes, en_count); > > > > ofctrl_init(&ed_flow_output.group_table, > > &ed_flow_output.meter_table, > > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[]) > > unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt, > > &pending_pkt); > > > > - uint64_t engine_run_id = 0; > > - bool engine_run_done = true; > > - > > unsigned int ovs_cond_seqno = UINT_MAX; > > unsigned int ovnsb_cond_seqno = UINT_MAX; > > > > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[]) > > exiting = false; > > restart = false; > > while (!exiting) { > > - engine_run_id++; > > + engine_init_run(en_nodes, en_count, &en_flow_output); > > > > update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); > > update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); > > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[]) > > * this round of engine_run and continue processing > > * acculated changes incrementally later when > > * ofctrl_can_put() returns true. */ > > - if (engine_run_done) { > > + if (!engine_aborted(&en_flow_output)) { > > engine_set_abort_recompute(true); > > - engine_run_done = engine_run(&en_flow_output, > > - engine_run_id); > > + engine_run(en_nodes, en_count); > > } > > } else { > > engine_set_abort_recompute(false); > > - engine_run_done = true; > > - engine_run(&en_flow_output, engine_run_id); > > + engine_run(en_nodes, en_count); > > } > > } > > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[]) > > sbrec_meter_table_get(ovnsb_idl_loop.idl), > > get_nb_cfg(sbrec_sb_global_table_get( > > ovnsb_idl_loop.idl)), > > - en_flow_output.changed); > > + engine_node_changed(&en_flow_output)); > > pinctrl_run(ovnsb_idl_txn, > > sbrec_datapath_binding_by_key, > > sbrec_port_binding_by_datapath, > > @@ -2087,7 +2098,7 @@ main(int argc, char *argv[]) > > &ed_runtime_data.local_datapaths, > > &ed_runtime_data.active_tunnels); > > > > - if (en_runtime_data.changed) { > > + if (engine_node_changed(&en_runtime_data)) { > > update_sb_monitors(ovnsb_idl_loop.idl, chassis, > > &ed_runtime_data.local_lports, > > &ed_runtime_data.local_datapaths); > > @@ -2095,17 +2106,17 @@ main(int argc, char *argv[]) > > } > > > > } > > - if (engine_need_run(&en_flow_output, engine_run_id)) { > > + if (engine_need_run(en_nodes, en_count)) { > > VLOG_DBG("engine did not run, force recompute next time: " > > "br_int %p, chassis %p", br_int, chassis); > > engine_set_force_recompute(true); > > poll_immediate_wake(); > > - } else if (!engine_run_done) { > > + } else if (engine_aborted(&en_flow_output)) { > > VLOG_DBG("engine was aborted, force recompute next time: " > > "br_int %p, chassis %p", br_int, chassis); > > engine_set_force_recompute(true); > > poll_immediate_wake(); > > - } else if (!engine_has_run(&en_flow_output, engine_run_id)) { > > + } else if (!engine_has_run(&en_flow_output)) { > > VLOG_DBG("engine did not run, and it was not needed" > > " either: br_int %p, chassis %p", > > br_int, chassis); > > @@ -2133,8 +2144,7 @@ main(int argc, char *argv[]) > > } > > } else { > > VLOG_DBG("Pending_pkt conn but br_int %p or chassis " > > - "%p not ready. run-id: %"PRIu64, br_int, > > - chassis, engine_run_id); > > + "%p not ready.", br_int, chassis); > > unixctl_command_reply_error(pending_pkt.conn, > > "ovn-controller not ready."); > > } > > @@ -2183,7 +2193,7 @@ main(int argc, char *argv[]) > > } > > > > engine_set_context(NULL); > > - engine_cleanup(&en_flow_output); > > + engine_cleanup(en_nodes, en_count); > > > > /* It's time to exit. Clean up the databases if we are not restarting */ > > if (!restart) { > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > > index 8a085e2..b438a15 100644 > > --- a/lib/inc-proc-eng.c > > +++ b/lib/inc-proc-eng.c > > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false; > > static bool engine_abort_recompute = false; > > static const struct engine_context *engine_context; > > > > +static const char *engine_node_state_name[EN_STATE_MAX] = { > > + [EN_STALE] = "Stale", > > + [EN_UPDATED] = "Updated", > > + [EN_VALID] = "Valid", > > + [EN_ABORTED] = "Aborted", > > +}; > > + > > void > > engine_set_force_recompute(bool val) > > { > > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx) > > engine_context = ctx; > > } > > > > -void > > -engine_init(struct engine_node *node) > > +/* Builds the topologically sorted 'sorted_nodes' array starting from > > + * 'node'. > > + */ > > +static struct engine_node ** > > +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes, > > + size_t *n_count, size_t *n_size) > > { > > + /* It's not so efficient to walk the array of already sorted nodes but > > + * we know that sorting is done only once at startup so it's ok for now. > > + */ > > + for (size_t i = 0; i < *n_count; i++) { > > + if (sorted_nodes[i] == node) { > > + return sorted_nodes; > > + } > > + } > > + > > for (size_t i = 0; i < node->n_inputs; i++) { > > - engine_init(node->inputs[i].node); > > + sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes, > > + n_count, n_size); > > } > > - if (node->init) { > > - node->init(node); > > + if (*n_count == *n_size) { > > + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes); > > } > > + sorted_nodes[(*n_count)] = node; > > + (*n_count)++; > > + return sorted_nodes; > > +} > > + > > +struct engine_node ** > > +engine_get_nodes(struct engine_node *root_node, size_t *n_count) > > +{ > > + size_t n_size = 0; > > + > > + *n_count = 0; > > + return engine_topo_sort(root_node, NULL, n_count, &n_size); > > } > > > > void > > -engine_cleanup(struct engine_node *node) > > +engine_init(struct engine_node **nodes, size_t n_count) > > { > > - for (size_t i = 0; i < node->n_inputs; i++) { > > - engine_cleanup(node->inputs[i].node); > > + for (size_t i = 0; i < n_count; i++) { > > + if (nodes[i]->init) { > > + nodes[i]->init(nodes[i]); > > + } > > } > > - if (node->cleanup) { > > - node->cleanup(node); > > +} > > + > > +void > > +engine_cleanup(struct engine_node **nodes, size_t n_count) > > +{ > > + for (size_t i = 0; i < n_count; i++) { > > + if (nodes[i]->cleanup) { > > + nodes[i]->cleanup(nodes[i]); > > + } > > } > > + free(nodes); > > } > > > > struct engine_node * > > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, > > ed->n_indexes ++; > > } > > > > +void > > +engine_set_node_state_at(struct engine_node *node, > > + enum engine_node_state state, > > + const char *where) > > +{ > > + if (node->state == state) { > > + return; > > + } > > + > > + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", > > + where, node->name, > > + engine_node_state_name[node->state], > > + engine_node_state_name[state]); > > + > > + node->state = state; > > +} > > + > > +static bool > > +engine_node_valid(struct engine_node *node) > > +{ > > + return (node->state == EN_UPDATED || node->state == EN_VALID); > > +} > > + > > +bool > > +engine_node_changed(struct engine_node *node) > > +{ > > + return node->state == EN_UPDATED; > > +} > > + > > +bool > > +engine_has_run(struct engine_node *root_node) > > +{ > > + return root_node->state != EN_STALE; > > +} > > + > > bool > > -engine_has_run(struct engine_node *node, uint64_t run_id) > > +engine_aborted(struct engine_node *node) > > { > > - return node->run_id == run_id; > > + return node->state == EN_ABORTED; > > +} > > + > > +void > > +engine_init_run(struct engine_node **nodes, size_t n_count, > > + struct engine_node *root_node) > > +{ > > + /* No need to reinitialize if last run didn't happen. */ > > + if (!engine_has_run(root_node)) { > > + return; > > + } > > + > > + VLOG_DBG("Initializing new run"); > > + for (size_t i = 0; i < n_count; i++) { > > + engine_set_node_state(nodes[i], EN_STALE); > > + } > > } > > > > /* Do a full recompute (or at least try). If we're not allowed then > > * mark the node as "aborted". > > */ > > -static bool > > +static void > > engine_recompute(struct engine_node *node, bool forced, bool allowed) > > { > > VLOG_DBG("node: %s, recompute (%s)", node->name, > > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed) > > > > if (!allowed) { > > VLOG_DBG("node: %s, recompute aborted", node->name); > > - return false; > > + engine_set_node_state(node, EN_ABORTED); > > + return; > > } > > > > + /* Run the node handler which might change state. */ > > node->run(node); > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > - return true; > > } > > > > /* Return true if the node could be computed without triggerring a full > > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed) > > { > > for (size_t i = 0; i < node->n_inputs; i++) { > > /* If the input node data changed call its change handler. */ > > - if (node->inputs[i].node->changed) { > > + if (node->inputs[i].node->state == EN_UPDATED) { > > VLOG_DBG("node: %s, handle change for input %s", > > node->name, node->inputs[i].node->name); > > > > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed) > > VLOG_DBG("node: %s, can't handle change for input %s, " > > "fall back to recompute", > > node->name, node->inputs[i].node->name); > > - if (!engine_recompute(node, false, recompute_allowed)) { > > + engine_recompute(node, false, recompute_allowed); > > + if (engine_aborted(node)) { > > return false; > > } > > } > > } > > } > > - > > return true; > > } > > > > -bool engine_run(struct engine_node *node, uint64_t run_id) > > +static void > > +engine_run_node(struct engine_node *node) > > { > > - if (node->run_id == run_id) { > > - /* The node was already updated in this run (could be input for > > - * multiple other nodes). Stop processing. > > - */ > > - return true; > > - } > > - > > - /* Initialize the node for this run. */ > > - node->run_id = run_id; > > - node->changed = false; > > - > > if (!node->n_inputs) { > > + /* Run the node handler which might change state. */ > > node->run(node); > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > - return true; > > + return; > > } > > > > + bool input_stale = false; > > for (size_t i = 0; i < node->n_inputs; i++) { > > - if (!engine_run(node->inputs[i].node, run_id)) { > > - return false; > > + if (!engine_node_valid(node->inputs[i].node)) { > > + /* If the input node aborted computation, move to EN_ABORTED. > > + * This will be propagated to following nodes. > > + */ > > + if (engine_aborted(node->inputs[i].node)) { > > + engine_set_node_state(node, EN_ABORTED); > > + } > > + > > + input_stale = true; > > } > > } > > > > - bool need_compute = false; > > + /* If at least one input is stale, don't change state. */ > > + if (input_stale) { > > + return; > > + } > > > > if (engine_force_recompute) { > > - return engine_recompute(node, true, !engine_abort_recompute); > > + engine_recompute(node, true, !engine_abort_recompute); > > + return; > > } > > > > /* If any of the inputs updated data but there is no change_handler, then > > * recompute the current node too. > > */ > > + bool need_compute = false; > > for (size_t i = 0; i < node->n_inputs; i++) { > > - if (node->inputs[i].node->changed) { > > + if (node->inputs[i].node->state == EN_UPDATED) { > > need_compute = true; > > > > /* Trigger a recompute if we don't have a change handler. */ > > if (!node->inputs[i].change_handler) { > > - return engine_recompute(node, false, !engine_abort_recompute); > > + engine_recompute(node, false, !engine_abort_recompute); > > + return; > > } > > } > > } > > @@ -231,33 +328,42 @@ bool engine_run(struct engine_node *node, uint64_t run_id) > > /* If we couldn't compute the node we either aborted or triggered > > * a full recompute. In any case, stop processing. > > */ > > - return engine_compute(node, !engine_abort_recompute); > > + if (!engine_compute(node, !engine_abort_recompute)) { > > + return; > > + } > > } > > > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > - return true; > > + /* If we reached this point, either the node was updated or its state is > > + * still valid. > > + */ > > + if (!engine_node_changed(node)) { > > + engine_set_node_state(node, EN_VALID); > > + } > > } > > > > -bool > > -engine_need_run(struct engine_node *node, uint64_t run_id) > > +void > > +engine_run(struct engine_node **nodes, size_t n_count) > > { > > - size_t i; > > - > > - if (node->run_id == run_id) { > > - return false; > > + for (size_t i = 0; i < n_count; i++) { > > + engine_run_node(nodes[i]); > > } > > +} > > > > - if (!node->n_inputs) { > > - node->run(node); > > - VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); > > - return node->changed; > > - } > > +bool > > +engine_need_run(struct engine_node **nodes, size_t n_count) > > +{ > > + for (size_t i = 0; i < n_count; i++) { > > + /* Check only leaf nodes. */ > > + if (nodes[i]->n_inputs) { > > + continue; > > + } > > > > - for (i = 0; i < node->n_inputs; i++) { > > - if (engine_need_run(node->inputs[i].node, run_id)) { > > + nodes[i]->run(nodes[i]); > > + VLOG_DBG("input node: %s, state: %s", nodes[i]->name, > > + engine_node_state_name[nodes[i]->state]); > > + if (nodes[i]->state == EN_UPDATED) { > > return true; > > } > > } > > - > > return false; > > } > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > > index abd41b2..9a35f1f 100644 > > --- a/lib/inc-proc-eng.h > > +++ b/lib/inc-proc-eng.h > > @@ -82,10 +82,21 @@ struct engine_node_input { > > bool (*change_handler)(struct engine_node *node); > > }; > > > > -struct engine_node { > > - /* A unique id to distinguish each iteration of the engine_run(). */ > > - uint64_t run_id; > > +enum engine_node_state { > > + EN_STALE, /* Data in the node is not up to date with the DB. */ > > + EN_UPDATED, /* Data in the node is valid but was updated during the > > + * last run. > > + */ > > + EN_VALID, /* Data in the node is valid and didn't change during the > > + * last run. > > + */ > > + EN_ABORTED, /* During the last run, processing was aborted for > > + * this node. > > + */ > > + EN_STATE_MAX, > > +}; > > > > +struct engine_node { > > /* A unique name for each node. */ > > char *name; > > > > @@ -102,8 +113,8 @@ struct engine_node { > > * node. */ > > void *data; > > > > - /* Whether the data changed in the last engine run. */ > > - bool changed; > > + /* State of the node after the last engine run. */ > > + enum engine_node_state state; > > > > /* Method to initialize data. It may be NULL. */ > > void (*init)(struct engine_node *); > > @@ -116,23 +127,35 @@ struct engine_node { > > void (*run)(struct engine_node *); > > }; > > > > -/* Initialize the data for the engine nodes recursively. It calls each node's > > +/* Return the array of topologically sorted nodes when starting from > > + * 'root_node'. Stores the number of nodes in 'n_count'. > > + * It should be called before the main loop. > > + */ > > +struct engine_node **engine_get_nodes(struct engine_node *root_node, > > + size_t *n_count); > > + > > +/* Initialize the data for the engine nodes. It calls each node's > > * init() method if not NULL. It should be called before the main loop. */ > > -void engine_init(struct engine_node *); > > +void engine_init(struct engine_node **nodes, size_t n_count); > > + > > +/* Initialize the engine nodes for a new run. It should be called in the > > + * main processing loop before every potential engine_run(). > > + */ > > +void engine_init_run(struct engine_node **nodes, size_t n_count, > > + struct engine_node *root_node); > > > > /* Execute the processing recursively, which should be called in the main > > - * loop. Returns true if the execution is compelte, false if it is aborted, > > - * which could happen when engine_abort_recompute is set. */ > > -bool engine_run(struct engine_node *, uint64_t run_id); > > + * loop. Updates the engine node's states accordingly. > > + */ > > +void engine_run(struct engine_node **nodes, size_t n_count); > > > > -/* Clean up the data for the engine nodes recursively. It calls each node's > > +/* Clean up the data for the engine nodes. It calls each node's > > * cleanup() method if not NULL. It should be called before the program > > * terminates. */ > > -void engine_cleanup(struct engine_node *); > > +void engine_cleanup(struct engine_node **nodes, size_t n_count); > > > > /* Check if engine needs to run but didn't. */ > > -bool > > -engine_need_run(struct engine_node *, uint64_t run_id); > > +bool engine_need_run(struct engine_node **nodes, size_t n_count); > > > > /* Get the input node with <name> for <node> */ > > struct engine_node * engine_get_input(const char *input_name, > > @@ -159,8 +182,22 @@ const struct engine_context * engine_get_context(void); > > > > void engine_set_context(const struct engine_context *); > > > > -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ > > -bool engine_has_run(struct engine_node *node, uint64_t run_id); > > +void engine_set_node_state_at(struct engine_node *node, > > + enum engine_node_state state, > > + const char *where); > > + > > +/* Return true if during the last iteration the node's data was updated. */ > > +bool engine_node_changed(struct engine_node *node); > > + > > +/* Return true if the engine has run for 'node' in the last iteration. */ > > +bool engine_has_run(struct engine_node *node); > > + > > +/* Returns true if during the last engine run we had to abort processing. */ > > +bool engine_aborted(struct engine_node *node); > > + > > +/* Set the state of the node and log changes. */ > > +#define engine_set_node_state(node, state) \ > > + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) > > > > struct ed_ovsdb_index { > > const char *name; > > @@ -187,6 +224,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name, > > struct engine_node en_##NAME = { \ > > .name = NAME_STR, \ > > .data = &ed_##NAME, \ > > + .state = EN_STALE, \ > > .init = en_##NAME##_init, \ > > .run = en_##NAME##_run, \ > > .cleanup = en_##NAME##_cleanup, \ > > @@ -201,10 +239,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \ > > const struct DB_NAME##rec_##TBL_NAME##_table *table = \ > > EN_OVSDB_GET(node); \ > > if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ > > - node->changed = true; \ > > + engine_set_node_state(node, EN_UPDATED); \ > > return; \ > > } \ > > - node->changed = false; \ > > + engine_set_node_state(node, EN_VALID); \ > > } \ > > static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \ > > = NULL; \ > > > > _______________________________________________ > > dev mailing list > > dev@openvswitch.org > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > > >
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c index 3922f3d..4f8ceae 100644 --- a/controller/ovn-controller.c +++ b/controller/ovn-controller.c @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node) (struct ed_type_ofctrl_is_connected *)node->data; if (data->connected != ofctrl_is_connected()) { data->connected = !data->connected; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_addr_sets { @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node) addr_sets_init(as_table, &as->addr_sets); as->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node) addr_sets_update(as_table, &as->addr_sets, &as->new, &as->deleted, &as->updated); - node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) - || !sset_is_empty(&as->updated); + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || + !sset_is_empty(&as->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } as->change_tracked = true; - node->changed = true; return true; } @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node) port_groups_init(pg_table, &pg->port_groups); pg->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node) port_groups_update(pg_table, &pg->port_groups, &pg->new, &pg->deleted, &pg->updated); - node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) - || !sset_is_empty(&pg->updated); + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || + !sset_is_empty(&pg->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } pg->change_tracked = true; - node->changed = true; return true; } @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node) update_ct_zones(local_lports, local_datapaths, ct_zones, ct_zone_bitmap, pending_ct_zones); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node) enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); if (data->mff_ovn_geneve != mff_ovn_geneve) { data->mff_ovn_geneve = mff_ovn_geneve; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_flow_output { @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node) active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node) flow_table, group_table, meter_table, lfrr, conj_id_ofs); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return handled; } @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node) lflow_handle_changed_neighbors(sbrec_port_binding_by_name, mac_binding_table, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node) chassis, ct_zones, local_datapaths, active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node) mff_ovn_geneve, chassis, ct_zones, local_datapaths, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, updated) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, new) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } return true; @@ -1922,7 +1934,11 @@ main(int argc, char *argv[]) engine_add_input(&en_runtime_data, &en_sb_port_binding, runtime_data_sb_port_binding_handler); - engine_init(&en_flow_output); + /* Get the sorted engine nodes to be used for every engine run. */ + size_t en_count = 0; + struct engine_node **en_nodes = engine_get_nodes(&en_flow_output, + &en_count); + engine_init(en_nodes, en_count); ofctrl_init(&ed_flow_output.group_table, &ed_flow_output.meter_table, @@ -1941,9 +1957,6 @@ main(int argc, char *argv[]) unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt, &pending_pkt); - uint64_t engine_run_id = 0; - bool engine_run_done = true; - unsigned int ovs_cond_seqno = UINT_MAX; unsigned int ovnsb_cond_seqno = UINT_MAX; @@ -1951,7 +1964,7 @@ main(int argc, char *argv[]) exiting = false; restart = false; while (!exiting) { - engine_run_id++; + engine_init_run(en_nodes, en_count, &en_flow_output); update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); @@ -2044,15 +2057,13 @@ main(int argc, char *argv[]) * this round of engine_run and continue processing * acculated changes incrementally later when * ofctrl_can_put() returns true. */ - if (engine_run_done) { + if (!engine_aborted(&en_flow_output)) { engine_set_abort_recompute(true); - engine_run_done = engine_run(&en_flow_output, - engine_run_id); + engine_run(en_nodes, en_count); } } else { engine_set_abort_recompute(false); - engine_run_done = true; - engine_run(&en_flow_output, engine_run_id); + engine_run(en_nodes, en_count); } } stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, @@ -2071,7 +2082,7 @@ main(int argc, char *argv[]) sbrec_meter_table_get(ovnsb_idl_loop.idl), get_nb_cfg(sbrec_sb_global_table_get( ovnsb_idl_loop.idl)), - en_flow_output.changed); + engine_node_changed(&en_flow_output)); pinctrl_run(ovnsb_idl_txn, sbrec_datapath_binding_by_key, sbrec_port_binding_by_datapath, @@ -2087,7 +2098,7 @@ main(int argc, char *argv[]) &ed_runtime_data.local_datapaths, &ed_runtime_data.active_tunnels); - if (en_runtime_data.changed) { + if (engine_node_changed(&en_runtime_data)) { update_sb_monitors(ovnsb_idl_loop.idl, chassis, &ed_runtime_data.local_lports, &ed_runtime_data.local_datapaths); @@ -2095,17 +2106,17 @@ main(int argc, char *argv[]) } } - if (engine_need_run(&en_flow_output, engine_run_id)) { + if (engine_need_run(en_nodes, en_count)) { VLOG_DBG("engine did not run, force recompute next time: " "br_int %p, chassis %p", br_int, chassis); engine_set_force_recompute(true); poll_immediate_wake(); - } else if (!engine_run_done) { + } else if (engine_aborted(&en_flow_output)) { VLOG_DBG("engine was aborted, force recompute next time: " "br_int %p, chassis %p", br_int, chassis); engine_set_force_recompute(true); poll_immediate_wake(); - } else if (!engine_has_run(&en_flow_output, engine_run_id)) { + } else if (!engine_has_run(&en_flow_output)) { VLOG_DBG("engine did not run, and it was not needed" " either: br_int %p, chassis %p", br_int, chassis); @@ -2133,8 +2144,7 @@ main(int argc, char *argv[]) } } else { VLOG_DBG("Pending_pkt conn but br_int %p or chassis " - "%p not ready. run-id: %"PRIu64, br_int, - chassis, engine_run_id); + "%p not ready.", br_int, chassis); unixctl_command_reply_error(pending_pkt.conn, "ovn-controller not ready."); } @@ -2183,7 +2193,7 @@ main(int argc, char *argv[]) } engine_set_context(NULL); - engine_cleanup(&en_flow_output); + engine_cleanup(en_nodes, en_count); /* It's time to exit. Clean up the databases if we are not restarting */ if (!restart) { diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c index 8a085e2..b438a15 100644 --- a/lib/inc-proc-eng.c +++ b/lib/inc-proc-eng.c @@ -34,6 +34,13 @@ static bool engine_force_recompute = false; static bool engine_abort_recompute = false; static const struct engine_context *engine_context; +static const char *engine_node_state_name[EN_STATE_MAX] = { + [EN_STALE] = "Stale", + [EN_UPDATED] = "Updated", + [EN_VALID] = "Valid", + [EN_ABORTED] = "Aborted", +}; + void engine_set_force_recompute(bool val) { @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx) engine_context = ctx; } -void -engine_init(struct engine_node *node) +/* Builds the topologically sorted 'sorted_nodes' array starting from + * 'node'. + */ +static struct engine_node ** +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes, + size_t *n_count, size_t *n_size) { + /* It's not so efficient to walk the array of already sorted nodes but + * we know that sorting is done only once at startup so it's ok for now. + */ + for (size_t i = 0; i < *n_count; i++) { + if (sorted_nodes[i] == node) { + return sorted_nodes; + } + } + for (size_t i = 0; i < node->n_inputs; i++) { - engine_init(node->inputs[i].node); + sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes, + n_count, n_size); } - if (node->init) { - node->init(node); + if (*n_count == *n_size) { + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes); } + sorted_nodes[(*n_count)] = node; + (*n_count)++; + return sorted_nodes; +} + +struct engine_node ** +engine_get_nodes(struct engine_node *root_node, size_t *n_count) +{ + size_t n_size = 0; + + *n_count = 0; + return engine_topo_sort(root_node, NULL, n_count, &n_size); } void -engine_cleanup(struct engine_node *node) +engine_init(struct engine_node **nodes, size_t n_count) { - for (size_t i = 0; i < node->n_inputs; i++) { - engine_cleanup(node->inputs[i].node); + for (size_t i = 0; i < n_count; i++) { + if (nodes[i]->init) { + nodes[i]->init(nodes[i]); + } } - if (node->cleanup) { - node->cleanup(node); +} + +void +engine_cleanup(struct engine_node **nodes, size_t n_count) +{ + for (size_t i = 0; i < n_count; i++) { + if (nodes[i]->cleanup) { + nodes[i]->cleanup(nodes[i]); + } } + free(nodes); } struct engine_node * @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, ed->n_indexes ++; } +void +engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where) +{ + if (node->state == state) { + return; + } + + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", + where, node->name, + engine_node_state_name[node->state], + engine_node_state_name[state]); + + node->state = state; +} + +static bool +engine_node_valid(struct engine_node *node) +{ + return (node->state == EN_UPDATED || node->state == EN_VALID); +} + +bool +engine_node_changed(struct engine_node *node) +{ + return node->state == EN_UPDATED; +} + +bool +engine_has_run(struct engine_node *root_node) +{ + return root_node->state != EN_STALE; +} + bool -engine_has_run(struct engine_node *node, uint64_t run_id) +engine_aborted(struct engine_node *node) { - return node->run_id == run_id; + return node->state == EN_ABORTED; +} + +void +engine_init_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node) +{ + /* No need to reinitialize if last run didn't happen. */ + if (!engine_has_run(root_node)) { + return; + } + + VLOG_DBG("Initializing new run"); + for (size_t i = 0; i < n_count; i++) { + engine_set_node_state(nodes[i], EN_STALE); + } } /* Do a full recompute (or at least try). If we're not allowed then * mark the node as "aborted". */ -static bool +static void engine_recompute(struct engine_node *node, bool forced, bool allowed) { VLOG_DBG("node: %s, recompute (%s)", node->name, @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed) if (!allowed) { VLOG_DBG("node: %s, recompute aborted", node->name); - return false; + engine_set_node_state(node, EN_ABORTED); + return; } + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; } /* Return true if the node could be computed without triggerring a full @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed) { for (size_t i = 0; i < node->n_inputs; i++) { /* If the input node data changed call its change handler. */ - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { VLOG_DBG("node: %s, handle change for input %s", node->name, node->inputs[i].node->name); @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed) VLOG_DBG("node: %s, can't handle change for input %s, " "fall back to recompute", node->name, node->inputs[i].node->name); - if (!engine_recompute(node, false, recompute_allowed)) { + engine_recompute(node, false, recompute_allowed); + if (engine_aborted(node)) { return false; } } } } - return true; } -bool engine_run(struct engine_node *node, uint64_t run_id) +static void +engine_run_node(struct engine_node *node) { - if (node->run_id == run_id) { - /* The node was already updated in this run (could be input for - * multiple other nodes). Stop processing. - */ - return true; - } - - /* Initialize the node for this run. */ - node->run_id = run_id; - node->changed = false; - if (!node->n_inputs) { + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; + return; } + bool input_stale = false; for (size_t i = 0; i < node->n_inputs; i++) { - if (!engine_run(node->inputs[i].node, run_id)) { - return false; + if (!engine_node_valid(node->inputs[i].node)) { + /* If the input node aborted computation, move to EN_ABORTED. + * This will be propagated to following nodes. + */ + if (engine_aborted(node->inputs[i].node)) { + engine_set_node_state(node, EN_ABORTED); + } + + input_stale = true; } } - bool need_compute = false; + /* If at least one input is stale, don't change state. */ + if (input_stale) { + return; + } if (engine_force_recompute) { - return engine_recompute(node, true, !engine_abort_recompute); + engine_recompute(node, true, !engine_abort_recompute); + return; } /* If any of the inputs updated data but there is no change_handler, then * recompute the current node too. */ + bool need_compute = false; for (size_t i = 0; i < node->n_inputs; i++) { - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { need_compute = true; /* Trigger a recompute if we don't have a change handler. */ if (!node->inputs[i].change_handler) { - return engine_recompute(node, false, !engine_abort_recompute); + engine_recompute(node, false, !engine_abort_recompute); + return; } } } @@ -231,33 +328,42 @@ bool engine_run(struct engine_node *node, uint64_t run_id) /* If we couldn't compute the node we either aborted or triggered * a full recompute. In any case, stop processing. */ - return engine_compute(node, !engine_abort_recompute); + if (!engine_compute(node, !engine_abort_recompute)) { + return; + } } - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; + /* If we reached this point, either the node was updated or its state is + * still valid. + */ + if (!engine_node_changed(node)) { + engine_set_node_state(node, EN_VALID); + } } -bool -engine_need_run(struct engine_node *node, uint64_t run_id) +void +engine_run(struct engine_node **nodes, size_t n_count) { - size_t i; - - if (node->run_id == run_id) { - return false; + for (size_t i = 0; i < n_count; i++) { + engine_run_node(nodes[i]); } +} - if (!node->n_inputs) { - node->run(node); - VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); - return node->changed; - } +bool +engine_need_run(struct engine_node **nodes, size_t n_count) +{ + for (size_t i = 0; i < n_count; i++) { + /* Check only leaf nodes. */ + if (nodes[i]->n_inputs) { + continue; + } - for (i = 0; i < node->n_inputs; i++) { - if (engine_need_run(node->inputs[i].node, run_id)) { + nodes[i]->run(nodes[i]); + VLOG_DBG("input node: %s, state: %s", nodes[i]->name, + engine_node_state_name[nodes[i]->state]); + if (nodes[i]->state == EN_UPDATED) { return true; } } - return false; } diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h index abd41b2..9a35f1f 100644 --- a/lib/inc-proc-eng.h +++ b/lib/inc-proc-eng.h @@ -82,10 +82,21 @@ struct engine_node_input { bool (*change_handler)(struct engine_node *node); }; -struct engine_node { - /* A unique id to distinguish each iteration of the engine_run(). */ - uint64_t run_id; +enum engine_node_state { + EN_STALE, /* Data in the node is not up to date with the DB. */ + EN_UPDATED, /* Data in the node is valid but was updated during the + * last run. + */ + EN_VALID, /* Data in the node is valid and didn't change during the + * last run. + */ + EN_ABORTED, /* During the last run, processing was aborted for + * this node. + */ + EN_STATE_MAX, +}; +struct engine_node { /* A unique name for each node. */ char *name; @@ -102,8 +113,8 @@ struct engine_node { * node. */ void *data; - /* Whether the data changed in the last engine run. */ - bool changed; + /* State of the node after the last engine run. */ + enum engine_node_state state; /* Method to initialize data. It may be NULL. */ void (*init)(struct engine_node *); @@ -116,23 +127,35 @@ struct engine_node { void (*run)(struct engine_node *); }; -/* Initialize the data for the engine nodes recursively. It calls each node's +/* Return the array of topologically sorted nodes when starting from + * 'root_node'. Stores the number of nodes in 'n_count'. + * It should be called before the main loop. + */ +struct engine_node **engine_get_nodes(struct engine_node *root_node, + size_t *n_count); + +/* Initialize the data for the engine nodes. It calls each node's * init() method if not NULL. It should be called before the main loop. */ -void engine_init(struct engine_node *); +void engine_init(struct engine_node **nodes, size_t n_count); + +/* Initialize the engine nodes for a new run. It should be called in the + * main processing loop before every potential engine_run(). + */ +void engine_init_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node); /* Execute the processing recursively, which should be called in the main - * loop. Returns true if the execution is compelte, false if it is aborted, - * which could happen when engine_abort_recompute is set. */ -bool engine_run(struct engine_node *, uint64_t run_id); + * loop. Updates the engine node's states accordingly. + */ +void engine_run(struct engine_node **nodes, size_t n_count); -/* Clean up the data for the engine nodes recursively. It calls each node's +/* Clean up the data for the engine nodes. It calls each node's * cleanup() method if not NULL. It should be called before the program * terminates. */ -void engine_cleanup(struct engine_node *); +void engine_cleanup(struct engine_node **nodes, size_t n_count); /* Check if engine needs to run but didn't. */ -bool -engine_need_run(struct engine_node *, uint64_t run_id); +bool engine_need_run(struct engine_node **nodes, size_t n_count); /* Get the input node with <name> for <node> */ struct engine_node * engine_get_input(const char *input_name, @@ -159,8 +182,22 @@ const struct engine_context * engine_get_context(void); void engine_set_context(const struct engine_context *); -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ -bool engine_has_run(struct engine_node *node, uint64_t run_id); +void engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where); + +/* Return true if during the last iteration the node's data was updated. */ +bool engine_node_changed(struct engine_node *node); + +/* Return true if the engine has run for 'node' in the last iteration. */ +bool engine_has_run(struct engine_node *node); + +/* Returns true if during the last engine run we had to abort processing. */ +bool engine_aborted(struct engine_node *node); + +/* Set the state of the node and log changes. */ +#define engine_set_node_state(node, state) \ + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) struct ed_ovsdb_index { const char *name; @@ -187,6 +224,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name, struct engine_node en_##NAME = { \ .name = NAME_STR, \ .data = &ed_##NAME, \ + .state = EN_STALE, \ .init = en_##NAME##_init, \ .run = en_##NAME##_run, \ .cleanup = en_##NAME##_cleanup, \ @@ -201,10 +239,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \ const struct DB_NAME##rec_##TBL_NAME##_table *table = \ EN_OVSDB_GET(node); \ if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ - node->changed = true; \ + engine_set_node_state(node, EN_UPDATED); \ return; \ } \ - node->changed = false; \ + engine_set_node_state(node, EN_VALID); \ } \ static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \ = NULL; \
This commit transforms the 'changed' field in struct engine_node in a 'state' field. Possible node states are: - "Stale": data in the node is not up to date with the DB. - "Updated": data in the node is valid but was updated during the last run of the engine. - "Valid": data in the node is valid and didn't change during the last run of the engine. - "Aborted": during the last run, processing was aborted for this node. This commit also further refactors the I-P engine: - instead of recursively performing all the engine processing a preprocessing stage is added (engine_get_nodes()) before the main processing loop is executed in order to topologically sort nodes in the engine such that all inputs of a given node appear in the sorted array before the node itself. This simplifies a bit the code in engine_run(). - remove the need for using an engine_run_id by using the newly added states. Signed-off-by: Dumitru Ceara <dceara@redhat.com> --- controller/ovn-controller.c | 88 ++++++++++------- lib/inc-proc-eng.c | 218 ++++++++++++++++++++++++++++++++----------- lib/inc-proc-eng.h | 74 +++++++++++---- 3 files changed, 267 insertions(+), 113 deletions(-)