diff mbox series

[ovs-dev,v3,ovn,2/4] ovn-controller: Add per node states to I-P engine.

Message ID 20191114170524.30391.61326.stgit@dceara.remote.csb
State Superseded
Headers show
Series [ovs-dev,v3,ovn,1/4] ovn-controller: Refactor I-P engine_run() tracking. | expand

Commit Message

Dumitru Ceara Nov. 14, 2019, 5:05 p.m. UTC
This commit transforms the 'changed' field in struct engine_node in a
'state' field. Possible node states are:
- "Stale": data in the node is not up to date with the DB.
- "Updated": data in the node is valid but was updated during
  the last run of the engine.
- "Valid": data in the node is valid and didn't change during
  the last run of the engine.
- "Aborted": during the last run, processing was aborted for
  this node.

This commit also further refactors the I-P engine:
- instead of recursively performing all the engine processing a
  preprocessing stage is added (engine_get_nodes()) before the main processing
  loop is executed in order to topologically sort nodes in the engine such
  that all inputs of a given node appear in the sorted array before the node
  itself. This simplifies a bit the code in engine_run().
- remove the need for using an engine_run_id by using the newly added states.

Signed-off-by: Dumitru Ceara <dceara@redhat.com>
---
 controller/ovn-controller.c |   88 ++++++++++-------
 lib/inc-proc-eng.c          |  218 ++++++++++++++++++++++++++++++++-----------
 lib/inc-proc-eng.h          |   74 +++++++++++----
 3 files changed, 267 insertions(+), 113 deletions(-)

Comments

Numan Siddique Nov. 18, 2019, 7:38 a.m. UTC | #1
Thanks for this series Dumitru. This is really helpful.

Few comments below.


On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote:
>
> This commit transforms the 'changed' field in struct engine_node in a
> 'state' field. Possible node states are:
> - "Stale": data in the node is not up to date with the DB.
> - "Updated": data in the node is valid but was updated during
>   the last run of the engine.
> - "Valid": data in the node is valid and didn't change during
>   the last run of the engine.
> - "Aborted": during the last run, processing was aborted for
>   this node.
>
> This commit also further refactors the I-P engine:
> - instead of recursively performing all the engine processing a
>   preprocessing stage is added (engine_get_nodes()) before the main processing
>   loop is executed in order to topologically sort nodes in the engine such
>   that all inputs of a given node appear in the sorted array before the node
>   itself. This simplifies a bit the code in engine_run().
> - remove the need for using an engine_run_id by using the newly added states.
>
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> ---

After applying this patch I  notice that adding a logical switch or
logical router is resulting
the engine_run() to be called twice  i.e the function
engine_need_run() always returns true when I
run "ovn-nbctl ls-add sw0" and engine_run() is called again.

If you enable debug and add a logical switch you will see [1] to be
hit all the time which is not the case
before this patch.

Thanks
Numan


>  controller/ovn-controller.c |   88 ++++++++++-------
>  lib/inc-proc-eng.c          |  218 ++++++++++++++++++++++++++++++++-----------
>  lib/inc-proc-eng.h          |   74 +++++++++++----
>  3 files changed, 267 insertions(+), 113 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index 3922f3d..4f8ceae 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
>          (struct ed_type_ofctrl_is_connected *)node->data;
>      if (data->connected != ofctrl_is_connected()) {
>          data->connected = !data->connected;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_addr_sets {
> @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
>      addr_sets_init(as_table, &as->addr_sets);
>
>      as->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node)
>      addr_sets_update(as_table, &as->addr_sets, &as->new,
>                       &as->deleted, &as->updated);
>
> -    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
> -                    || !sset_is_empty(&as->updated);
> +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> +            !sset_is_empty(&as->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      as->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
>      port_groups_init(pg_table, &pg->port_groups);
>
>      pg->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node)
>      port_groups_update(pg_table, &pg->port_groups, &pg->new,
>                       &pg->deleted, &pg->updated);
>
> -    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
> -                    || !sset_is_empty(&pg->updated);
> +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> +            !sset_is_empty(&pg->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      pg->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
>      update_ct_zones(local_lports, local_datapaths, ct_zones,
>                      ct_zone_bitmap, pending_ct_zones);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node)
>      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
>      if (data->mff_ovn_geneve != mff_ovn_geneve) {
>          data->mff_ovn_geneve = mff_ovn_geneve;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_flow_output {
> @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
>                   active_tunnels,
>                   flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node)
>                flow_table, group_table, meter_table, lfrr,
>                conj_id_ofs);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return handled;
>  }
>
> @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node)
>      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
>              mac_binding_table, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
>              chassis, ct_zones, local_datapaths,
>              active_tunnels, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
>              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
>              flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>
>  }
> @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, updated) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, new) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>
>      return true;
> @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
>      engine_add_input(&en_runtime_data, &en_sb_port_binding,
>                       runtime_data_sb_port_binding_handler);
>
> -    engine_init(&en_flow_output);
> +    /* Get the sorted engine nodes to be used for every engine run. */
> +    size_t en_count = 0;
> +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> +                                                     &en_count);
> +    engine_init(en_nodes, en_count);
>
>      ofctrl_init(&ed_flow_output.group_table,
>                  &ed_flow_output.meter_table,
> @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
>      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
>                               &pending_pkt);
>
> -    uint64_t engine_run_id = 0;
> -    bool engine_run_done = true;
> -
>      unsigned int ovs_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>
> @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
>      exiting = false;
>      restart = false;
>      while (!exiting) {
> -        engine_run_id++;
> +        engine_init_run(en_nodes, en_count, &en_flow_output);
>
>          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
>          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
>                               * this round of engine_run and continue processing
>                               * acculated changes incrementally later when
>                               * ofctrl_can_put() returns true. */
> -                            if (engine_run_done) {
> +                            if (!engine_aborted(&en_flow_output)) {
>                                  engine_set_abort_recompute(true);
> -                                engine_run_done = engine_run(&en_flow_output,
> -                                                             engine_run_id);
> +                                engine_run(en_nodes, en_count);
>                              }
>                          } else {
>                              engine_set_abort_recompute(false);
> -                            engine_run_done = true;
> -                            engine_run(&en_flow_output, engine_run_id);
> +                            engine_run(en_nodes, en_count);
>                          }
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
>                                 sbrec_meter_table_get(ovnsb_idl_loop.idl),
>                                 get_nb_cfg(sbrec_sb_global_table_get(
>                                                ovnsb_idl_loop.idl)),
> -                               en_flow_output.changed);
> +                               engine_node_changed(&en_flow_output));
>                      pinctrl_run(ovnsb_idl_txn,
>                                  sbrec_datapath_binding_by_key,
>                                  sbrec_port_binding_by_datapath,
> @@ -2087,7 +2098,7 @@ main(int argc, char *argv[])
>                                  &ed_runtime_data.local_datapaths,
>                                  &ed_runtime_data.active_tunnels);
>
> -                    if (en_runtime_data.changed) {
> +                    if (engine_node_changed(&en_runtime_data)) {
>                          update_sb_monitors(ovnsb_idl_loop.idl, chassis,
>                                             &ed_runtime_data.local_lports,
>                                             &ed_runtime_data.local_datapaths);
> @@ -2095,17 +2106,17 @@ main(int argc, char *argv[])
>                  }
>
>              }
> -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> +            if (engine_need_run(en_nodes, en_count)) {
>                  VLOG_DBG("engine did not run, force recompute next time: "
>                              "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_run_done) {
> +            } else if (engine_aborted(&en_flow_output)) {
>                  VLOG_DBG("engine was aborted, force recompute next time: "
>                           "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> +            } else if (!engine_has_run(&en_flow_output)) {
>                  VLOG_DBG("engine did not run, and it was not needed"
>                           " either: br_int %p, chassis %p",
>                           br_int, chassis);
> @@ -2133,8 +2144,7 @@ main(int argc, char *argv[])
>                      }
>                  } else {
>                      VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
> -                             "%p not ready. run-id: %"PRIu64, br_int,
> -                             chassis, engine_run_id);
> +                             "%p not ready.", br_int, chassis);
>                      unixctl_command_reply_error(pending_pkt.conn,
>                          "ovn-controller not ready.");
>                  }
> @@ -2183,7 +2193,7 @@ main(int argc, char *argv[])
>      }
>
>      engine_set_context(NULL);
> -    engine_cleanup(&en_flow_output);
> +    engine_cleanup(en_nodes, en_count);
>
>      /* It's time to exit.  Clean up the databases if we are not restarting */
>      if (!restart) {
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 8a085e2..b438a15 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
>  static bool engine_abort_recompute = false;
>  static const struct engine_context *engine_context;
>
> +static const char *engine_node_state_name[EN_STATE_MAX] = {
> +    [EN_STALE]   = "Stale",
> +    [EN_UPDATED] = "Updated",
> +    [EN_VALID]   = "Valid",
> +    [EN_ABORTED] = "Aborted",
> +};
> +
>  void
>  engine_set_force_recompute(bool val)
>  {
> @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx)
>      engine_context = ctx;
>  }
>
> -void
> -engine_init(struct engine_node *node)
> +/* Builds the topologically sorted 'sorted_nodes' array starting from
> + * 'node'.
> + */
> +static struct engine_node **
> +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes,
> +                 size_t *n_count, size_t *n_size)
>  {
> +    /* It's not so efficient to walk the array of already sorted nodes but
> +     * we know that sorting is done only once at startup so it's ok for now.
> +     */
> +    for (size_t i = 0; i < *n_count; i++) {
> +        if (sorted_nodes[i] == node) {
> +            return sorted_nodes;
> +        }
> +    }
> +
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_init(node->inputs[i].node);
> +        sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes,
> +                                        n_count, n_size);
>      }
> -    if (node->init) {
> -        node->init(node);
> +    if (*n_count == *n_size) {
> +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes);
>      }
> +    sorted_nodes[(*n_count)] = node;
> +    (*n_count)++;
> +    return sorted_nodes;
> +}
> +
> +struct engine_node **
> +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> +{
> +    size_t n_size = 0;
> +
> +    *n_count = 0;
> +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
>  }
>
>  void
> -engine_cleanup(struct engine_node *node)
> +engine_init(struct engine_node **nodes, size_t n_count)
>  {
> -    for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_cleanup(node->inputs[i].node);
> +    for (size_t i = 0; i < n_count; i++) {
> +        if (nodes[i]->init) {
> +            nodes[i]->init(nodes[i]);
> +        }
>      }
> -    if (node->cleanup) {
> -        node->cleanup(node);
> +}
> +
> +void
> +engine_cleanup(struct engine_node **nodes, size_t n_count)
> +{
> +    for (size_t i = 0; i < n_count; i++) {
> +        if (nodes[i]->cleanup) {
> +            nodes[i]->cleanup(nodes[i]);
> +        }
>      }
> +    free(nodes);
>  }
>
>  struct engine_node *
> @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
>      ed->n_indexes ++;
>  }
>
> +void
> +engine_set_node_state_at(struct engine_node *node,
> +                         enum engine_node_state state,
> +                         const char *where)
> +{
> +    if (node->state == state) {
> +        return;
> +    }
> +
> +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> +             where, node->name,
> +             engine_node_state_name[node->state],
> +             engine_node_state_name[state]);
> +
> +    node->state = state;
> +}
> +
> +static bool
> +engine_node_valid(struct engine_node *node)
> +{
> +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> +}
> +
> +bool
> +engine_node_changed(struct engine_node *node)
> +{
> +    return node->state == EN_UPDATED;
> +}
> +
> +bool
> +engine_has_run(struct engine_node *root_node)
> +{
> +    return root_node->state != EN_STALE;
> +}
> +
>  bool
> -engine_has_run(struct engine_node *node, uint64_t run_id)
> +engine_aborted(struct engine_node *node)
>  {
> -    return node->run_id == run_id;
> +    return node->state == EN_ABORTED;
> +}
> +
> +void
> +engine_init_run(struct engine_node **nodes, size_t n_count,
> +                struct engine_node *root_node)
> +{
> +    /* No need to reinitialize if last run didn't happen. */
> +    if (!engine_has_run(root_node)) {
> +        return;
> +    }
> +
> +    VLOG_DBG("Initializing new run");
> +    for (size_t i = 0; i < n_count; i++) {
> +        engine_set_node_state(nodes[i], EN_STALE);
> +    }
>  }
>
>  /* Do a full recompute (or at least try). If we're not allowed then
>   * mark the node as "aborted".
>   */
> -static bool
> +static void
>  engine_recompute(struct engine_node *node, bool forced, bool allowed)
>  {
>      VLOG_DBG("node: %s, recompute (%s)", node->name,
> @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed)
>
>      if (!allowed) {
>          VLOG_DBG("node: %s, recompute aborted", node->name);
> -        return false;
> +        engine_set_node_state(node, EN_ABORTED);
> +        return;
>      }
>
> +    /* Run the node handler which might change state. */
>      node->run(node);
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
>  }
>
>  /* Return true if the node could be computed without triggerring a full
> @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
>  {
>      for (size_t i = 0; i < node->n_inputs; i++) {
>          /* If the input node data changed call its change handler. */
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              VLOG_DBG("node: %s, handle change for input %s",
>                       node->name, node->inputs[i].node->name);
>
> @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
>                  VLOG_DBG("node: %s, can't handle change for input %s, "
>                           "fall back to recompute",
>                           node->name, node->inputs[i].node->name);
> -                if (!engine_recompute(node, false, recompute_allowed)) {
> +                engine_recompute(node, false, recompute_allowed);
> +                if (engine_aborted(node)) {
>                      return false;
>                  }
>              }
>          }
>      }
> -
>      return true;
>  }
>
> -bool engine_run(struct engine_node *node, uint64_t run_id)
> +static void
> +engine_run_node(struct engine_node *node)
>  {
> -    if (node->run_id == run_id) {
> -        /* The node was already updated in this run (could be input for
> -         * multiple other nodes). Stop processing.
> -         */
> -        return true;
> -    }
> -
> -    /* Initialize the node for this run. */
> -    node->run_id = run_id;
> -    node->changed = false;
> -
>      if (!node->n_inputs) {
> +        /* Run the node handler which might change state. */
>          node->run(node);
> -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -        return true;
> +        return;
>      }
>
> +    bool input_stale = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (!engine_run(node->inputs[i].node, run_id)) {
> -            return false;
> +        if (!engine_node_valid(node->inputs[i].node)) {
> +            /* If the input node aborted computation, move to EN_ABORTED.
> +             * This will be propagated to following nodes.
> +             */
> +            if (engine_aborted(node->inputs[i].node)) {
> +                engine_set_node_state(node, EN_ABORTED);
> +            }
> +
> +            input_stale = true;
>          }
>      }
>
> -    bool need_compute = false;
> +    /* If at least one input is stale, don't change state. */
> +    if (input_stale) {
> +        return;
> +    }
>
>      if (engine_force_recompute) {
> -        return engine_recompute(node, true, !engine_abort_recompute);
> +        engine_recompute(node, true, !engine_abort_recompute);
> +        return;
>      }
>
>      /* If any of the inputs updated data but there is no change_handler, then
>       * recompute the current node too.
>       */
> +    bool need_compute = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              need_compute = true;
>
>              /* Trigger a recompute if we don't have a change handler. */
>              if (!node->inputs[i].change_handler) {
> -                return engine_recompute(node, false, !engine_abort_recompute);
> +                engine_recompute(node, false, !engine_abort_recompute);
> +                return;
>              }
>          }
>      }
> @@ -231,33 +328,42 @@ bool engine_run(struct engine_node *node, uint64_t run_id)
>          /* If we couldn't compute the node we either aborted or triggered
>           * a full recompute. In any case, stop processing.
>           */
> -        return engine_compute(node, !engine_abort_recompute);
> +        if (!engine_compute(node, !engine_abort_recompute)) {
> +            return;
> +        }
>      }
>
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
> +    /* If we reached this point, either the node was updated or its state is
> +     * still valid.
> +     */
> +    if (!engine_node_changed(node)) {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>  }
>
> -bool
> -engine_need_run(struct engine_node *node, uint64_t run_id)
> +void
> +engine_run(struct engine_node **nodes, size_t n_count)
>  {
> -    size_t i;
> -
> -    if (node->run_id == run_id) {
> -        return false;
> +    for (size_t i = 0; i < n_count; i++) {
> +        engine_run_node(nodes[i]);
>      }
> +}
>
> -    if (!node->n_inputs) {
> -        node->run(node);
> -        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
> -        return node->changed;
> -    }
> +bool
> +engine_need_run(struct engine_node **nodes, size_t n_count)
> +{
> +    for (size_t i = 0; i < n_count; i++) {
> +        /* Check only leaf nodes. */
> +        if (nodes[i]->n_inputs) {
> +            continue;
> +        }
>
> -    for (i = 0; i < node->n_inputs; i++) {
> -        if (engine_need_run(node->inputs[i].node, run_id)) {
> +        nodes[i]->run(nodes[i]);
> +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> +                 engine_node_state_name[nodes[i]->state]);
> +        if (nodes[i]->state == EN_UPDATED) {
>              return true;
>          }
>      }
> -
>      return false;
>  }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index abd41b2..9a35f1f 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -82,10 +82,21 @@ struct engine_node_input {
>      bool (*change_handler)(struct engine_node *node);
>  };
>
> -struct engine_node {
> -    /* A unique id to distinguish each iteration of the engine_run(). */
> -    uint64_t run_id;
> +enum engine_node_state {
> +    EN_STALE,     /* Data in the node is not up to date with the DB. */
> +    EN_UPDATED,   /* Data in the node is valid but was updated during the
> +                   * last run.
> +                   */
> +    EN_VALID,     /* Data in the node is valid and didn't change during the
> +                   * last run.
> +                   */
> +    EN_ABORTED,   /* During the last run, processing was aborted for
> +                   * this node.
> +                   */
> +    EN_STATE_MAX,
> +};
>
> +struct engine_node {
>      /* A unique name for each node. */
>      char *name;
>
> @@ -102,8 +113,8 @@ struct engine_node {
>       * node. */
>      void *data;
>
> -    /* Whether the data changed in the last engine run. */
> -    bool changed;
> +    /* State of the node after the last engine run. */
> +    enum engine_node_state state;
>
>      /* Method to initialize data. It may be NULL. */
>      void (*init)(struct engine_node *);
> @@ -116,23 +127,35 @@ struct engine_node {
>      void (*run)(struct engine_node *);
>  };
>
> -/* Initialize the data for the engine nodes recursively. It calls each node's
> +/* Return the array of topologically sorted nodes when starting from
> + * 'root_node'. Stores the number of nodes in 'n_count'.
> + * It should be called before the main loop.
> + */
> +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> +                                      size_t *n_count);
> +
> +/* Initialize the data for the engine nodes. It calls each node's
>   * init() method if not NULL. It should be called before the main loop. */
> -void engine_init(struct engine_node *);
> +void engine_init(struct engine_node **nodes, size_t n_count);
> +
> +/* Initialize the engine nodes for a new run. It should be called in the
> + * main processing loop before every potential engine_run().
> + */
> +void engine_init_run(struct engine_node **nodes, size_t n_count,
> +                     struct engine_node *root_node);
>
>  /* Execute the processing recursively, which should be called in the main
> - * loop. Returns true if the execution is compelte, false if it is aborted,
> - * which could happen when engine_abort_recompute is set. */
> -bool engine_run(struct engine_node *, uint64_t run_id);
> + * loop. Updates the engine node's states accordingly.
> + */
> +void engine_run(struct engine_node **nodes, size_t n_count);
>
> -/* Clean up the data for the engine nodes recursively. It calls each node's
> +/* Clean up the data for the engine nodes. It calls each node's
>   * cleanup() method if not NULL. It should be called before the program
>   * terminates. */
> -void engine_cleanup(struct engine_node *);
> +void engine_cleanup(struct engine_node **nodes, size_t n_count);
>
>  /* Check if engine needs to run but didn't. */
> -bool
> -engine_need_run(struct engine_node *, uint64_t run_id);
> +bool engine_need_run(struct engine_node **nodes, size_t n_count);
>
>  /* Get the input node with <name> for <node> */
>  struct engine_node * engine_get_input(const char *input_name,
> @@ -159,8 +182,22 @@ const struct engine_context * engine_get_context(void);
>
>  void engine_set_context(const struct engine_context *);
>
> -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
> -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> +void engine_set_node_state_at(struct engine_node *node,
> +                              enum engine_node_state state,
> +                              const char *where);
> +
> +/* Return true if during the last iteration the node's data was updated. */
> +bool engine_node_changed(struct engine_node *node);
> +
> +/* Return true if the engine has run for 'node' in the last iteration. */
> +bool engine_has_run(struct engine_node *node);
> +
> +/* Returns true if during the last engine run we had to abort processing. */
> +bool engine_aborted(struct engine_node *node);
> +
> +/* Set the state of the node and log changes. */
> +#define engine_set_node_state(node, state) \
> +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
>
>  struct ed_ovsdb_index {
>      const char *name;
> @@ -187,6 +224,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
>      struct engine_node en_##NAME = { \
>          .name = NAME_STR, \
>          .data = &ed_##NAME, \
> +        .state = EN_STALE, \
>          .init = en_##NAME##_init, \
>          .run = en_##NAME##_run, \
>          .cleanup = en_##NAME##_cleanup, \
> @@ -201,10 +239,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
>      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
>          EN_OVSDB_GET(node); \
>      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> -        node->changed = true; \
> +        engine_set_node_state(node, EN_UPDATED); \
>          return; \
>      } \
> -    node->changed = false; \
> +    engine_set_node_state(node, EN_VALID); \
>  } \
>  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
>              = NULL; \
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Dumitru Ceara Nov. 18, 2019, 9:27 a.m. UTC | #2
On Mon, Nov 18, 2019 at 8:38 AM Numan Siddique <numans@ovn.org> wrote:
>
> Thanks for this series Dumitru. This is really helpful.
>
> Few comments below.
>
>
> On Thu, Nov 14, 2019 at 10:39 PM Dumitru Ceara <dceara@redhat.com> wrote:
> >
> > This commit transforms the 'changed' field in struct engine_node in a
> > 'state' field. Possible node states are:
> > - "Stale": data in the node is not up to date with the DB.
> > - "Updated": data in the node is valid but was updated during
> >   the last run of the engine.
> > - "Valid": data in the node is valid and didn't change during
> >   the last run of the engine.
> > - "Aborted": during the last run, processing was aborted for
> >   this node.
> >
> > This commit also further refactors the I-P engine:
> > - instead of recursively performing all the engine processing a
> >   preprocessing stage is added (engine_get_nodes()) before the main processing
> >   loop is executed in order to topologically sort nodes in the engine such
> >   that all inputs of a given node appear in the sorted array before the node
> >   itself. This simplifies a bit the code in engine_run().
> > - remove the need for using an engine_run_id by using the newly added states.
> >
> > Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> > ---
>
> After applying this patch I  notice that adding a logical switch or
> logical router is resulting
> the engine_run() to be called twice  i.e the function
> engine_need_run() always returns true when I
> run "ovn-nbctl ls-add sw0" and engine_run() is called again.
>
> If you enable debug and add a logical switch you will see [1] to be
> hit all the time which is not the case
> before this patch.
>
> Thanks
> Numan

Nice catch, sorry about that.
I forgot to add a check to see if the engine was run on the root node
(en_flow_output) in the iteration. I'll fix it in v4.

Thanks,
Dumitru

>
>
> >  controller/ovn-controller.c |   88 ++++++++++-------
> >  lib/inc-proc-eng.c          |  218 ++++++++++++++++++++++++++++++++-----------
> >  lib/inc-proc-eng.h          |   74 +++++++++++----
> >  3 files changed, 267 insertions(+), 113 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index 3922f3d..4f8ceae 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
> >          (struct ed_type_ofctrl_is_connected *)node->data;
> >      if (data->connected != ofctrl_is_connected()) {
> >          data->connected = !data->connected;
> > -        node->changed = true;
> > +        engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > -    node->changed = false;
> > +    engine_set_node_state(node, EN_VALID);
> >  }
> >
> >  struct ed_type_addr_sets {
> > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
> >      addr_sets_init(as_table, &as->addr_sets);
> >
> >      as->change_tracked = false;
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node)
> >      addr_sets_update(as_table, &as->addr_sets, &as->new,
> >                       &as->deleted, &as->updated);
> >
> > -    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
> > -                    || !sset_is_empty(&as->updated);
> > +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> > +            !sset_is_empty(&as->updated)) {
> > +        engine_set_node_state(node, EN_UPDATED);
> > +    } else {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >
> >      as->change_tracked = true;
> > -    node->changed = true;
> >      return true;
> >  }
> >
> > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
> >      port_groups_init(pg_table, &pg->port_groups);
> >
> >      pg->change_tracked = false;
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node)
> >      port_groups_update(pg_table, &pg->port_groups, &pg->new,
> >                       &pg->deleted, &pg->updated);
> >
> > -    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
> > -                    || !sset_is_empty(&pg->updated);
> > +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> > +            !sset_is_empty(&pg->updated)) {
> > +        engine_set_node_state(node, EN_UPDATED);
> > +    } else {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >
> >      pg->change_tracked = true;
> > -    node->changed = true;
> >      return true;
> >  }
> >
> > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
> >      update_ct_zones(local_lports, local_datapaths, ct_zones,
> >                      ct_zone_bitmap, pending_ct_zones);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node)
> >      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> >      if (data->mff_ovn_geneve != mff_ovn_geneve) {
> >          data->mff_ovn_geneve = mff_ovn_geneve;
> > -        node->changed = true;
> > +        engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > -    node->changed = false;
> > +    engine_set_node_state(node, EN_VALID);
> >  }
> >
> >  struct ed_type_flow_output {
> > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
> >                   active_tunnels,
> >                   flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node)
> >                flow_table, group_table, meter_table, lfrr,
> >                conj_id_ofs);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return handled;
> >  }
> >
> > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node)
> >      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> >              mac_binding_table, flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >  }
> >
> > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
> >              chassis, ct_zones, local_datapaths,
> >              active_tunnels, flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >  }
> >
> > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
> >              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
> >              flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >
> >  }
> > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >      SSET_FOR_EACH (ref_name, updated) {
> >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >      SSET_FOR_EACH (ref_name, new) {
> >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >
> >      return true;
> > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
> >      engine_add_input(&en_runtime_data, &en_sb_port_binding,
> >                       runtime_data_sb_port_binding_handler);
> >
> > -    engine_init(&en_flow_output);
> > +    /* Get the sorted engine nodes to be used for every engine run. */
> > +    size_t en_count = 0;
> > +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> > +                                                     &en_count);
> > +    engine_init(en_nodes, en_count);
> >
> >      ofctrl_init(&ed_flow_output.group_table,
> >                  &ed_flow_output.meter_table,
> > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
> >      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
> >                               &pending_pkt);
> >
> > -    uint64_t engine_run_id = 0;
> > -    bool engine_run_done = true;
> > -
> >      unsigned int ovs_cond_seqno = UINT_MAX;
> >      unsigned int ovnsb_cond_seqno = UINT_MAX;
> >
> > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
> >      exiting = false;
> >      restart = false;
> >      while (!exiting) {
> > -        engine_run_id++;
> > +        engine_init_run(en_nodes, en_count, &en_flow_output);
> >
> >          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
> >          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
> >                               * this round of engine_run and continue processing
> >                               * acculated changes incrementally later when
> >                               * ofctrl_can_put() returns true. */
> > -                            if (engine_run_done) {
> > +                            if (!engine_aborted(&en_flow_output)) {
> >                                  engine_set_abort_recompute(true);
> > -                                engine_run_done = engine_run(&en_flow_output,
> > -                                                             engine_run_id);
> > +                                engine_run(en_nodes, en_count);
> >                              }
> >                          } else {
> >                              engine_set_abort_recompute(false);
> > -                            engine_run_done = true;
> > -                            engine_run(&en_flow_output, engine_run_id);
> > +                            engine_run(en_nodes, en_count);
> >                          }
> >                      }
> >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
> >                                 sbrec_meter_table_get(ovnsb_idl_loop.idl),
> >                                 get_nb_cfg(sbrec_sb_global_table_get(
> >                                                ovnsb_idl_loop.idl)),
> > -                               en_flow_output.changed);
> > +                               engine_node_changed(&en_flow_output));
> >                      pinctrl_run(ovnsb_idl_txn,
> >                                  sbrec_datapath_binding_by_key,
> >                                  sbrec_port_binding_by_datapath,
> > @@ -2087,7 +2098,7 @@ main(int argc, char *argv[])
> >                                  &ed_runtime_data.local_datapaths,
> >                                  &ed_runtime_data.active_tunnels);
> >
> > -                    if (en_runtime_data.changed) {
> > +                    if (engine_node_changed(&en_runtime_data)) {
> >                          update_sb_monitors(ovnsb_idl_loop.idl, chassis,
> >                                             &ed_runtime_data.local_lports,
> >                                             &ed_runtime_data.local_datapaths);
> > @@ -2095,17 +2106,17 @@ main(int argc, char *argv[])
> >                  }
> >
> >              }
> > -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> > +            if (engine_need_run(en_nodes, en_count)) {
> >                  VLOG_DBG("engine did not run, force recompute next time: "
> >                              "br_int %p, chassis %p", br_int, chassis);
> >                  engine_set_force_recompute(true);
> >                  poll_immediate_wake();
> > -            } else if (!engine_run_done) {
> > +            } else if (engine_aborted(&en_flow_output)) {
> >                  VLOG_DBG("engine was aborted, force recompute next time: "
> >                           "br_int %p, chassis %p", br_int, chassis);
> >                  engine_set_force_recompute(true);
> >                  poll_immediate_wake();
> > -            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> > +            } else if (!engine_has_run(&en_flow_output)) {
> >                  VLOG_DBG("engine did not run, and it was not needed"
> >                           " either: br_int %p, chassis %p",
> >                           br_int, chassis);
> > @@ -2133,8 +2144,7 @@ main(int argc, char *argv[])
> >                      }
> >                  } else {
> >                      VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
> > -                             "%p not ready. run-id: %"PRIu64, br_int,
> > -                             chassis, engine_run_id);
> > +                             "%p not ready.", br_int, chassis);
> >                      unixctl_command_reply_error(pending_pkt.conn,
> >                          "ovn-controller not ready.");
> >                  }
> > @@ -2183,7 +2193,7 @@ main(int argc, char *argv[])
> >      }
> >
> >      engine_set_context(NULL);
> > -    engine_cleanup(&en_flow_output);
> > +    engine_cleanup(en_nodes, en_count);
> >
> >      /* It's time to exit.  Clean up the databases if we are not restarting */
> >      if (!restart) {
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 8a085e2..b438a15 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
> >  static bool engine_abort_recompute = false;
> >  static const struct engine_context *engine_context;
> >
> > +static const char *engine_node_state_name[EN_STATE_MAX] = {
> > +    [EN_STALE]   = "Stale",
> > +    [EN_UPDATED] = "Updated",
> > +    [EN_VALID]   = "Valid",
> > +    [EN_ABORTED] = "Aborted",
> > +};
> > +
> >  void
> >  engine_set_force_recompute(bool val)
> >  {
> > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx)
> >      engine_context = ctx;
> >  }
> >
> > -void
> > -engine_init(struct engine_node *node)
> > +/* Builds the topologically sorted 'sorted_nodes' array starting from
> > + * 'node'.
> > + */
> > +static struct engine_node **
> > +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes,
> > +                 size_t *n_count, size_t *n_size)
> >  {
> > +    /* It's not so efficient to walk the array of already sorted nodes but
> > +     * we know that sorting is done only once at startup so it's ok for now.
> > +     */
> > +    for (size_t i = 0; i < *n_count; i++) {
> > +        if (sorted_nodes[i] == node) {
> > +            return sorted_nodes;
> > +        }
> > +    }
> > +
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        engine_init(node->inputs[i].node);
> > +        sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes,
> > +                                        n_count, n_size);
> >      }
> > -    if (node->init) {
> > -        node->init(node);
> > +    if (*n_count == *n_size) {
> > +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes);
> >      }
> > +    sorted_nodes[(*n_count)] = node;
> > +    (*n_count)++;
> > +    return sorted_nodes;
> > +}
> > +
> > +struct engine_node **
> > +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> > +{
> > +    size_t n_size = 0;
> > +
> > +    *n_count = 0;
> > +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
> >  }
> >
> >  void
> > -engine_cleanup(struct engine_node *node)
> > +engine_init(struct engine_node **nodes, size_t n_count)
> >  {
> > -    for (size_t i = 0; i < node->n_inputs; i++) {
> > -        engine_cleanup(node->inputs[i].node);
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        if (nodes[i]->init) {
> > +            nodes[i]->init(nodes[i]);
> > +        }
> >      }
> > -    if (node->cleanup) {
> > -        node->cleanup(node);
> > +}
> > +
> > +void
> > +engine_cleanup(struct engine_node **nodes, size_t n_count)
> > +{
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        if (nodes[i]->cleanup) {
> > +            nodes[i]->cleanup(nodes[i]);
> > +        }
> >      }
> > +    free(nodes);
> >  }
> >
> >  struct engine_node *
> > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
> >      ed->n_indexes ++;
> >  }
> >
> > +void
> > +engine_set_node_state_at(struct engine_node *node,
> > +                         enum engine_node_state state,
> > +                         const char *where)
> > +{
> > +    if (node->state == state) {
> > +        return;
> > +    }
> > +
> > +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> > +             where, node->name,
> > +             engine_node_state_name[node->state],
> > +             engine_node_state_name[state]);
> > +
> > +    node->state = state;
> > +}
> > +
> > +static bool
> > +engine_node_valid(struct engine_node *node)
> > +{
> > +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> > +}
> > +
> > +bool
> > +engine_node_changed(struct engine_node *node)
> > +{
> > +    return node->state == EN_UPDATED;
> > +}
> > +
> > +bool
> > +engine_has_run(struct engine_node *root_node)
> > +{
> > +    return root_node->state != EN_STALE;
> > +}
> > +
> >  bool
> > -engine_has_run(struct engine_node *node, uint64_t run_id)
> > +engine_aborted(struct engine_node *node)
> >  {
> > -    return node->run_id == run_id;
> > +    return node->state == EN_ABORTED;
> > +}
> > +
> > +void
> > +engine_init_run(struct engine_node **nodes, size_t n_count,
> > +                struct engine_node *root_node)
> > +{
> > +    /* No need to reinitialize if last run didn't happen. */
> > +    if (!engine_has_run(root_node)) {
> > +        return;
> > +    }
> > +
> > +    VLOG_DBG("Initializing new run");
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        engine_set_node_state(nodes[i], EN_STALE);
> > +    }
> >  }
> >
> >  /* Do a full recompute (or at least try). If we're not allowed then
> >   * mark the node as "aborted".
> >   */
> > -static bool
> > +static void
> >  engine_recompute(struct engine_node *node, bool forced, bool allowed)
> >  {
> >      VLOG_DBG("node: %s, recompute (%s)", node->name,
> > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed)
> >
> >      if (!allowed) {
> >          VLOG_DBG("node: %s, recompute aborted", node->name);
> > -        return false;
> > +        engine_set_node_state(node, EN_ABORTED);
> > +        return;
> >      }
> >
> > +    /* Run the node handler which might change state. */
> >      node->run(node);
> > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -    return true;
> >  }
> >
> >  /* Return true if the node could be computed without triggerring a full
> > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
> >  {
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> >          /* If the input node data changed call its change handler. */
> > -        if (node->inputs[i].node->changed) {
> > +        if (node->inputs[i].node->state == EN_UPDATED) {
> >              VLOG_DBG("node: %s, handle change for input %s",
> >                       node->name, node->inputs[i].node->name);
> >
> > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
> >                  VLOG_DBG("node: %s, can't handle change for input %s, "
> >                           "fall back to recompute",
> >                           node->name, node->inputs[i].node->name);
> > -                if (!engine_recompute(node, false, recompute_allowed)) {
> > +                engine_recompute(node, false, recompute_allowed);
> > +                if (engine_aborted(node)) {
> >                      return false;
> >                  }
> >              }
> >          }
> >      }
> > -
> >      return true;
> >  }
> >
> > -bool engine_run(struct engine_node *node, uint64_t run_id)
> > +static void
> > +engine_run_node(struct engine_node *node)
> >  {
> > -    if (node->run_id == run_id) {
> > -        /* The node was already updated in this run (could be input for
> > -         * multiple other nodes). Stop processing.
> > -         */
> > -        return true;
> > -    }
> > -
> > -    /* Initialize the node for this run. */
> > -    node->run_id = run_id;
> > -    node->changed = false;
> > -
> >      if (!node->n_inputs) {
> > +        /* Run the node handler which might change state. */
> >          node->run(node);
> > -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -        return true;
> > +        return;
> >      }
> >
> > +    bool input_stale = false;
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        if (!engine_run(node->inputs[i].node, run_id)) {
> > -            return false;
> > +        if (!engine_node_valid(node->inputs[i].node)) {
> > +            /* If the input node aborted computation, move to EN_ABORTED.
> > +             * This will be propagated to following nodes.
> > +             */
> > +            if (engine_aborted(node->inputs[i].node)) {
> > +                engine_set_node_state(node, EN_ABORTED);
> > +            }
> > +
> > +            input_stale = true;
> >          }
> >      }
> >
> > -    bool need_compute = false;
> > +    /* If at least one input is stale, don't change state. */
> > +    if (input_stale) {
> > +        return;
> > +    }
> >
> >      if (engine_force_recompute) {
> > -        return engine_recompute(node, true, !engine_abort_recompute);
> > +        engine_recompute(node, true, !engine_abort_recompute);
> > +        return;
> >      }
> >
> >      /* If any of the inputs updated data but there is no change_handler, then
> >       * recompute the current node too.
> >       */
> > +    bool need_compute = false;
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        if (node->inputs[i].node->changed) {
> > +        if (node->inputs[i].node->state == EN_UPDATED) {
> >              need_compute = true;
> >
> >              /* Trigger a recompute if we don't have a change handler. */
> >              if (!node->inputs[i].change_handler) {
> > -                return engine_recompute(node, false, !engine_abort_recompute);
> > +                engine_recompute(node, false, !engine_abort_recompute);
> > +                return;
> >              }
> >          }
> >      }
> > @@ -231,33 +328,42 @@ bool engine_run(struct engine_node *node, uint64_t run_id)
> >          /* If we couldn't compute the node we either aborted or triggered
> >           * a full recompute. In any case, stop processing.
> >           */
> > -        return engine_compute(node, !engine_abort_recompute);
> > +        if (!engine_compute(node, !engine_abort_recompute)) {
> > +            return;
> > +        }
> >      }
> >
> > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -    return true;
> > +    /* If we reached this point, either the node was updated or its state is
> > +     * still valid.
> > +     */
> > +    if (!engine_node_changed(node)) {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >  }
> >
> > -bool
> > -engine_need_run(struct engine_node *node, uint64_t run_id)
> > +void
> > +engine_run(struct engine_node **nodes, size_t n_count)
> >  {
> > -    size_t i;
> > -
> > -    if (node->run_id == run_id) {
> > -        return false;
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        engine_run_node(nodes[i]);
> >      }
> > +}
> >
> > -    if (!node->n_inputs) {
> > -        node->run(node);
> > -        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
> > -        return node->changed;
> > -    }
> > +bool
> > +engine_need_run(struct engine_node **nodes, size_t n_count)
> > +{
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        /* Check only leaf nodes. */
> > +        if (nodes[i]->n_inputs) {
> > +            continue;
> > +        }
> >
> > -    for (i = 0; i < node->n_inputs; i++) {
> > -        if (engine_need_run(node->inputs[i].node, run_id)) {
> > +        nodes[i]->run(nodes[i]);
> > +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> > +                 engine_node_state_name[nodes[i]->state]);
> > +        if (nodes[i]->state == EN_UPDATED) {
> >              return true;
> >          }
> >      }
> > -
> >      return false;
> >  }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index abd41b2..9a35f1f 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -82,10 +82,21 @@ struct engine_node_input {
> >      bool (*change_handler)(struct engine_node *node);
> >  };
> >
> > -struct engine_node {
> > -    /* A unique id to distinguish each iteration of the engine_run(). */
> > -    uint64_t run_id;
> > +enum engine_node_state {
> > +    EN_STALE,     /* Data in the node is not up to date with the DB. */
> > +    EN_UPDATED,   /* Data in the node is valid but was updated during the
> > +                   * last run.
> > +                   */
> > +    EN_VALID,     /* Data in the node is valid and didn't change during the
> > +                   * last run.
> > +                   */
> > +    EN_ABORTED,   /* During the last run, processing was aborted for
> > +                   * this node.
> > +                   */
> > +    EN_STATE_MAX,
> > +};
> >
> > +struct engine_node {
> >      /* A unique name for each node. */
> >      char *name;
> >
> > @@ -102,8 +113,8 @@ struct engine_node {
> >       * node. */
> >      void *data;
> >
> > -    /* Whether the data changed in the last engine run. */
> > -    bool changed;
> > +    /* State of the node after the last engine run. */
> > +    enum engine_node_state state;
> >
> >      /* Method to initialize data. It may be NULL. */
> >      void (*init)(struct engine_node *);
> > @@ -116,23 +127,35 @@ struct engine_node {
> >      void (*run)(struct engine_node *);
> >  };
> >
> > -/* Initialize the data for the engine nodes recursively. It calls each node's
> > +/* Return the array of topologically sorted nodes when starting from
> > + * 'root_node'. Stores the number of nodes in 'n_count'.
> > + * It should be called before the main loop.
> > + */
> > +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> > +                                      size_t *n_count);
> > +
> > +/* Initialize the data for the engine nodes. It calls each node's
> >   * init() method if not NULL. It should be called before the main loop. */
> > -void engine_init(struct engine_node *);
> > +void engine_init(struct engine_node **nodes, size_t n_count);
> > +
> > +/* Initialize the engine nodes for a new run. It should be called in the
> > + * main processing loop before every potential engine_run().
> > + */
> > +void engine_init_run(struct engine_node **nodes, size_t n_count,
> > +                     struct engine_node *root_node);
> >
> >  /* Execute the processing recursively, which should be called in the main
> > - * loop. Returns true if the execution is compelte, false if it is aborted,
> > - * which could happen when engine_abort_recompute is set. */
> > -bool engine_run(struct engine_node *, uint64_t run_id);
> > + * loop. Updates the engine node's states accordingly.
> > + */
> > +void engine_run(struct engine_node **nodes, size_t n_count);
> >
> > -/* Clean up the data for the engine nodes recursively. It calls each node's
> > +/* Clean up the data for the engine nodes. It calls each node's
> >   * cleanup() method if not NULL. It should be called before the program
> >   * terminates. */
> > -void engine_cleanup(struct engine_node *);
> > +void engine_cleanup(struct engine_node **nodes, size_t n_count);
> >
> >  /* Check if engine needs to run but didn't. */
> > -bool
> > -engine_need_run(struct engine_node *, uint64_t run_id);
> > +bool engine_need_run(struct engine_node **nodes, size_t n_count);
> >
> >  /* Get the input node with <name> for <node> */
> >  struct engine_node * engine_get_input(const char *input_name,
> > @@ -159,8 +182,22 @@ const struct engine_context * engine_get_context(void);
> >
> >  void engine_set_context(const struct engine_context *);
> >
> > -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
> > -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> > +void engine_set_node_state_at(struct engine_node *node,
> > +                              enum engine_node_state state,
> > +                              const char *where);
> > +
> > +/* Return true if during the last iteration the node's data was updated. */
> > +bool engine_node_changed(struct engine_node *node);
> > +
> > +/* Return true if the engine has run for 'node' in the last iteration. */
> > +bool engine_has_run(struct engine_node *node);
> > +
> > +/* Returns true if during the last engine run we had to abort processing. */
> > +bool engine_aborted(struct engine_node *node);
> > +
> > +/* Set the state of the node and log changes. */
> > +#define engine_set_node_state(node, state) \
> > +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> >
> >  struct ed_ovsdb_index {
> >      const char *name;
> > @@ -187,6 +224,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
> >      struct engine_node en_##NAME = { \
> >          .name = NAME_STR, \
> >          .data = &ed_##NAME, \
> > +        .state = EN_STALE, \
> >          .init = en_##NAME##_init, \
> >          .run = en_##NAME##_run, \
> >          .cleanup = en_##NAME##_cleanup, \
> > @@ -201,10 +239,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
> >      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
> >          EN_OVSDB_GET(node); \
> >      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> > -        node->changed = true; \
> > +        engine_set_node_state(node, EN_UPDATED); \
> >          return; \
> >      } \
> > -    node->changed = false; \
> > +    engine_set_node_state(node, EN_VALID); \
> >  } \
> >  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
> >              = NULL; \
> >
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> >
>
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 3922f3d..4f8ceae 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -758,10 +758,10 @@  en_ofctrl_is_connected_run(struct engine_node *node)
         (struct ed_type_ofctrl_is_connected *)node->data;
     if (data->connected != ofctrl_is_connected()) {
         data->connected = !data->connected;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_addr_sets {
@@ -811,7 +811,7 @@  en_addr_sets_run(struct engine_node *node)
     addr_sets_init(as_table, &as->addr_sets);
 
     as->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -830,11 +830,14 @@  addr_sets_sb_address_set_handler(struct engine_node *node)
     addr_sets_update(as_table, &as->addr_sets, &as->new,
                      &as->deleted, &as->updated);
 
-    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
-                    || !sset_is_empty(&as->updated);
+    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
+            !sset_is_empty(&as->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     as->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -885,7 +888,7 @@  en_port_groups_run(struct engine_node *node)
     port_groups_init(pg_table, &pg->port_groups);
 
     pg->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -904,11 +907,14 @@  port_groups_sb_port_group_handler(struct engine_node *node)
     port_groups_update(pg_table, &pg->port_groups, &pg->new,
                      &pg->deleted, &pg->updated);
 
-    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
-                    || !sset_is_empty(&pg->updated);
+    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
+            !sset_is_empty(&pg->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     pg->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -1091,7 +1097,7 @@  en_runtime_data_run(struct engine_node *node)
     update_ct_zones(local_lports, local_datapaths, ct_zones,
                     ct_zone_bitmap, pending_ct_zones);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1157,10 +1163,10 @@  en_mff_ovn_geneve_run(struct engine_node *node)
     enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
     if (data->mff_ovn_geneve != mff_ovn_geneve) {
         data->mff_ovn_geneve = mff_ovn_geneve;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_flow_output {
@@ -1322,7 +1328,7 @@  en_flow_output_run(struct engine_node *node)
                  active_tunnels,
                  flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1404,7 +1410,7 @@  flow_output_sb_logical_flow_handler(struct engine_node *node)
               flow_table, group_table, meter_table, lfrr,
               conj_id_ofs);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return handled;
 }
 
@@ -1427,7 +1433,7 @@  flow_output_sb_mac_binding_handler(struct engine_node *node)
     lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
             mac_binding_table, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1531,7 +1537,7 @@  flow_output_sb_port_binding_handler(struct engine_node *node)
             chassis, ct_zones, local_datapaths,
             active_tunnels, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1580,7 +1586,7 @@  flow_output_sb_multicast_group_handler(struct engine_node *node)
             mff_ovn_geneve, chassis, ct_zones, local_datapaths,
             flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 
 }
@@ -1694,7 +1700,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, updated) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1707,7 +1715,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, new) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1720,7 +1730,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
 
     return true;
@@ -1922,7 +1934,11 @@  main(int argc, char *argv[])
     engine_add_input(&en_runtime_data, &en_sb_port_binding,
                      runtime_data_sb_port_binding_handler);
 
-    engine_init(&en_flow_output);
+    /* Get the sorted engine nodes to be used for every engine run. */
+    size_t en_count = 0;
+    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
+                                                     &en_count);
+    engine_init(en_nodes, en_count);
 
     ofctrl_init(&ed_flow_output.group_table,
                 &ed_flow_output.meter_table,
@@ -1941,9 +1957,6 @@  main(int argc, char *argv[])
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
-    uint64_t engine_run_id = 0;
-    bool engine_run_done = true;
-
     unsigned int ovs_cond_seqno = UINT_MAX;
     unsigned int ovnsb_cond_seqno = UINT_MAX;
 
@@ -1951,7 +1964,7 @@  main(int argc, char *argv[])
     exiting = false;
     restart = false;
     while (!exiting) {
-        engine_run_id++;
+        engine_init_run(en_nodes, en_count, &en_flow_output);
 
         update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
         update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
@@ -2044,15 +2057,13 @@  main(int argc, char *argv[])
                              * this round of engine_run and continue processing
                              * acculated changes incrementally later when
                              * ofctrl_can_put() returns true. */
-                            if (engine_run_done) {
+                            if (!engine_aborted(&en_flow_output)) {
                                 engine_set_abort_recompute(true);
-                                engine_run_done = engine_run(&en_flow_output,
-                                                             engine_run_id);
+                                engine_run(en_nodes, en_count);
                             }
                         } else {
                             engine_set_abort_recompute(false);
-                            engine_run_done = true;
-                            engine_run(&en_flow_output, engine_run_id);
+                            engine_run(en_nodes, en_count);
                         }
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
@@ -2071,7 +2082,7 @@  main(int argc, char *argv[])
                                sbrec_meter_table_get(ovnsb_idl_loop.idl),
                                get_nb_cfg(sbrec_sb_global_table_get(
                                               ovnsb_idl_loop.idl)),
-                               en_flow_output.changed);
+                               engine_node_changed(&en_flow_output));
                     pinctrl_run(ovnsb_idl_txn,
                                 sbrec_datapath_binding_by_key,
                                 sbrec_port_binding_by_datapath,
@@ -2087,7 +2098,7 @@  main(int argc, char *argv[])
                                 &ed_runtime_data.local_datapaths,
                                 &ed_runtime_data.active_tunnels);
 
-                    if (en_runtime_data.changed) {
+                    if (engine_node_changed(&en_runtime_data)) {
                         update_sb_monitors(ovnsb_idl_loop.idl, chassis,
                                            &ed_runtime_data.local_lports,
                                            &ed_runtime_data.local_datapaths);
@@ -2095,17 +2106,17 @@  main(int argc, char *argv[])
                 }
 
             }
-            if (engine_need_run(&en_flow_output, engine_run_id)) {
+            if (engine_need_run(en_nodes, en_count)) {
                 VLOG_DBG("engine did not run, force recompute next time: "
                             "br_int %p, chassis %p", br_int, chassis);
                 engine_set_force_recompute(true);
                 poll_immediate_wake();
-            } else if (!engine_run_done) {
+            } else if (engine_aborted(&en_flow_output)) {
                 VLOG_DBG("engine was aborted, force recompute next time: "
                          "br_int %p, chassis %p", br_int, chassis);
                 engine_set_force_recompute(true);
                 poll_immediate_wake();
-            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
+            } else if (!engine_has_run(&en_flow_output)) {
                 VLOG_DBG("engine did not run, and it was not needed"
                          " either: br_int %p, chassis %p",
                          br_int, chassis);
@@ -2133,8 +2144,7 @@  main(int argc, char *argv[])
                     }
                 } else {
                     VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
-                             "%p not ready. run-id: %"PRIu64, br_int,
-                             chassis, engine_run_id);
+                             "%p not ready.", br_int, chassis);
                     unixctl_command_reply_error(pending_pkt.conn,
                         "ovn-controller not ready.");
                 }
@@ -2183,7 +2193,7 @@  main(int argc, char *argv[])
     }
 
     engine_set_context(NULL);
-    engine_cleanup(&en_flow_output);
+    engine_cleanup(en_nodes, en_count);
 
     /* It's time to exit.  Clean up the databases if we are not restarting */
     if (!restart) {
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 8a085e2..b438a15 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -34,6 +34,13 @@  static bool engine_force_recompute = false;
 static bool engine_abort_recompute = false;
 static const struct engine_context *engine_context;
 
+static const char *engine_node_state_name[EN_STATE_MAX] = {
+    [EN_STALE]   = "Stale",
+    [EN_UPDATED] = "Updated",
+    [EN_VALID]   = "Valid",
+    [EN_ABORTED] = "Aborted",
+};
+
 void
 engine_set_force_recompute(bool val)
 {
@@ -58,26 +65,62 @@  engine_set_context(const struct engine_context *ctx)
     engine_context = ctx;
 }
 
-void
-engine_init(struct engine_node *node)
+/* Builds the topologically sorted 'sorted_nodes' array starting from
+ * 'node'.
+ */
+static struct engine_node **
+engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes,
+                 size_t *n_count, size_t *n_size)
 {
+    /* It's not so efficient to walk the array of already sorted nodes but
+     * we know that sorting is done only once at startup so it's ok for now.
+     */
+    for (size_t i = 0; i < *n_count; i++) {
+        if (sorted_nodes[i] == node) {
+            return sorted_nodes;
+        }
+    }
+
     for (size_t i = 0; i < node->n_inputs; i++) {
-        engine_init(node->inputs[i].node);
+        sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes,
+                                        n_count, n_size);
     }
-    if (node->init) {
-        node->init(node);
+    if (*n_count == *n_size) {
+        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes);
     }
+    sorted_nodes[(*n_count)] = node;
+    (*n_count)++;
+    return sorted_nodes;
+}
+
+struct engine_node **
+engine_get_nodes(struct engine_node *root_node, size_t *n_count)
+{
+    size_t n_size = 0;
+
+    *n_count = 0;
+    return engine_topo_sort(root_node, NULL, n_count, &n_size);
 }
 
 void
-engine_cleanup(struct engine_node *node)
+engine_init(struct engine_node **nodes, size_t n_count)
 {
-    for (size_t i = 0; i < node->n_inputs; i++) {
-        engine_cleanup(node->inputs[i].node);
+    for (size_t i = 0; i < n_count; i++) {
+        if (nodes[i]->init) {
+            nodes[i]->init(nodes[i]);
+        }
     }
-    if (node->cleanup) {
-        node->cleanup(node);
+}
+
+void
+engine_cleanup(struct engine_node **nodes, size_t n_count)
+{
+    for (size_t i = 0; i < n_count; i++) {
+        if (nodes[i]->cleanup) {
+            nodes[i]->cleanup(nodes[i]);
+        }
     }
+    free(nodes);
 }
 
 struct engine_node *
@@ -128,16 +171,66 @@  engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
     ed->n_indexes ++;
 }
 
+void
+engine_set_node_state_at(struct engine_node *node,
+                         enum engine_node_state state,
+                         const char *where)
+{
+    if (node->state == state) {
+        return;
+    }
+
+    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
+             where, node->name,
+             engine_node_state_name[node->state],
+             engine_node_state_name[state]);
+
+    node->state = state;
+}
+
+static bool
+engine_node_valid(struct engine_node *node)
+{
+    return (node->state == EN_UPDATED || node->state == EN_VALID);
+}
+
+bool
+engine_node_changed(struct engine_node *node)
+{
+    return node->state == EN_UPDATED;
+}
+
+bool
+engine_has_run(struct engine_node *root_node)
+{
+    return root_node->state != EN_STALE;
+}
+
 bool
-engine_has_run(struct engine_node *node, uint64_t run_id)
+engine_aborted(struct engine_node *node)
 {
-    return node->run_id == run_id;
+    return node->state == EN_ABORTED;
+}
+
+void
+engine_init_run(struct engine_node **nodes, size_t n_count,
+                struct engine_node *root_node)
+{
+    /* No need to reinitialize if last run didn't happen. */
+    if (!engine_has_run(root_node)) {
+        return;
+    }
+
+    VLOG_DBG("Initializing new run");
+    for (size_t i = 0; i < n_count; i++) {
+        engine_set_node_state(nodes[i], EN_STALE);
+    }
 }
 
 /* Do a full recompute (or at least try). If we're not allowed then
  * mark the node as "aborted".
  */
-static bool
+static void
 engine_recompute(struct engine_node *node, bool forced, bool allowed)
 {
     VLOG_DBG("node: %s, recompute (%s)", node->name,
@@ -145,12 +238,12 @@  engine_recompute(struct engine_node *node, bool forced, bool allowed)
 
     if (!allowed) {
         VLOG_DBG("node: %s, recompute aborted", node->name);
-        return false;
+        engine_set_node_state(node, EN_ABORTED);
+        return;
     }
 
+    /* Run the node handler which might change state. */
     node->run(node);
-    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-    return true;
 }
 
 /* Return true if the node could be computed without triggerring a full
@@ -161,7 +254,7 @@  engine_compute(struct engine_node *node, bool recompute_allowed)
 {
     for (size_t i = 0; i < node->n_inputs; i++) {
         /* If the input node data changed call its change handler. */
-        if (node->inputs[i].node->changed) {
+        if (node->inputs[i].node->state == EN_UPDATED) {
             VLOG_DBG("node: %s, handle change for input %s",
                      node->name, node->inputs[i].node->name);
 
@@ -172,57 +265,61 @@  engine_compute(struct engine_node *node, bool recompute_allowed)
                 VLOG_DBG("node: %s, can't handle change for input %s, "
                          "fall back to recompute",
                          node->name, node->inputs[i].node->name);
-                if (!engine_recompute(node, false, recompute_allowed)) {
+                engine_recompute(node, false, recompute_allowed);
+                if (engine_aborted(node)) {
                     return false;
                 }
             }
         }
     }
-
     return true;
 }
 
-bool engine_run(struct engine_node *node, uint64_t run_id)
+static void
+engine_run_node(struct engine_node *node)
 {
-    if (node->run_id == run_id) {
-        /* The node was already updated in this run (could be input for
-         * multiple other nodes). Stop processing.
-         */
-        return true;
-    }
-
-    /* Initialize the node for this run. */
-    node->run_id = run_id;
-    node->changed = false;
-
     if (!node->n_inputs) {
+        /* Run the node handler which might change state. */
         node->run(node);
-        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-        return true;
+        return;
     }
 
+    bool input_stale = false;
     for (size_t i = 0; i < node->n_inputs; i++) {
-        if (!engine_run(node->inputs[i].node, run_id)) {
-            return false;
+        if (!engine_node_valid(node->inputs[i].node)) {
+            /* If the input node aborted computation, move to EN_ABORTED.
+             * This will be propagated to following nodes.
+             */
+            if (engine_aborted(node->inputs[i].node)) {
+                engine_set_node_state(node, EN_ABORTED);
+            }
+
+            input_stale = true;
         }
     }
 
-    bool need_compute = false;
+    /* If at least one input is stale, don't change state. */
+    if (input_stale) {
+        return;
+    }
 
     if (engine_force_recompute) {
-        return engine_recompute(node, true, !engine_abort_recompute);
+        engine_recompute(node, true, !engine_abort_recompute);
+        return;
     }
 
     /* If any of the inputs updated data but there is no change_handler, then
      * recompute the current node too.
      */
+    bool need_compute = false;
     for (size_t i = 0; i < node->n_inputs; i++) {
-        if (node->inputs[i].node->changed) {
+        if (node->inputs[i].node->state == EN_UPDATED) {
             need_compute = true;
 
             /* Trigger a recompute if we don't have a change handler. */
             if (!node->inputs[i].change_handler) {
-                return engine_recompute(node, false, !engine_abort_recompute);
+                engine_recompute(node, false, !engine_abort_recompute);
+                return;
             }
         }
     }
@@ -231,33 +328,42 @@  bool engine_run(struct engine_node *node, uint64_t run_id)
         /* If we couldn't compute the node we either aborted or triggered
          * a full recompute. In any case, stop processing.
          */
-        return engine_compute(node, !engine_abort_recompute);
+        if (!engine_compute(node, !engine_abort_recompute)) {
+            return;
+        }
     }
 
-    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-    return true;
+    /* If we reached this point, either the node was updated or its state is
+     * still valid.
+     */
+    if (!engine_node_changed(node)) {
+        engine_set_node_state(node, EN_VALID);
+    }
 }
 
-bool
-engine_need_run(struct engine_node *node, uint64_t run_id)
+void
+engine_run(struct engine_node **nodes, size_t n_count)
 {
-    size_t i;
-
-    if (node->run_id == run_id) {
-        return false;
+    for (size_t i = 0; i < n_count; i++) {
+        engine_run_node(nodes[i]);
     }
+}
 
-    if (!node->n_inputs) {
-        node->run(node);
-        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
-        return node->changed;
-    }
+bool
+engine_need_run(struct engine_node **nodes, size_t n_count)
+{
+    for (size_t i = 0; i < n_count; i++) {
+        /* Check only leaf nodes. */
+        if (nodes[i]->n_inputs) {
+            continue;
+        }
 
-    for (i = 0; i < node->n_inputs; i++) {
-        if (engine_need_run(node->inputs[i].node, run_id)) {
+        nodes[i]->run(nodes[i]);
+        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
+                 engine_node_state_name[nodes[i]->state]);
+        if (nodes[i]->state == EN_UPDATED) {
             return true;
         }
     }
-
     return false;
 }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index abd41b2..9a35f1f 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -82,10 +82,21 @@  struct engine_node_input {
     bool (*change_handler)(struct engine_node *node);
 };
 
-struct engine_node {
-    /* A unique id to distinguish each iteration of the engine_run(). */
-    uint64_t run_id;
+enum engine_node_state {
+    EN_STALE,     /* Data in the node is not up to date with the DB. */
+    EN_UPDATED,   /* Data in the node is valid but was updated during the
+                   * last run.
+                   */
+    EN_VALID,     /* Data in the node is valid and didn't change during the
+                   * last run.
+                   */
+    EN_ABORTED,   /* During the last run, processing was aborted for
+                   * this node.
+                   */
+    EN_STATE_MAX,
+};
 
+struct engine_node {
     /* A unique name for each node. */
     char *name;
 
@@ -102,8 +113,8 @@  struct engine_node {
      * node. */
     void *data;
 
-    /* Whether the data changed in the last engine run. */
-    bool changed;
+    /* State of the node after the last engine run. */
+    enum engine_node_state state;
 
     /* Method to initialize data. It may be NULL. */
     void (*init)(struct engine_node *);
@@ -116,23 +127,35 @@  struct engine_node {
     void (*run)(struct engine_node *);
 };
 
-/* Initialize the data for the engine nodes recursively. It calls each node's
+/* Return the array of topologically sorted nodes when starting from
+ * 'root_node'. Stores the number of nodes in 'n_count'.
+ * It should be called before the main loop.
+ */
+struct engine_node **engine_get_nodes(struct engine_node *root_node,
+                                      size_t *n_count);
+
+/* Initialize the data for the engine nodes. It calls each node's
  * init() method if not NULL. It should be called before the main loop. */
-void engine_init(struct engine_node *);
+void engine_init(struct engine_node **nodes, size_t n_count);
+
+/* Initialize the engine nodes for a new run. It should be called in the
+ * main processing loop before every potential engine_run().
+ */
+void engine_init_run(struct engine_node **nodes, size_t n_count,
+                     struct engine_node *root_node);
 
 /* Execute the processing recursively, which should be called in the main
- * loop. Returns true if the execution is compelte, false if it is aborted,
- * which could happen when engine_abort_recompute is set. */
-bool engine_run(struct engine_node *, uint64_t run_id);
+ * loop. Updates the engine node's states accordingly.
+ */
+void engine_run(struct engine_node **nodes, size_t n_count);
 
-/* Clean up the data for the engine nodes recursively. It calls each node's
+/* Clean up the data for the engine nodes. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
-void engine_cleanup(struct engine_node *);
+void engine_cleanup(struct engine_node **nodes, size_t n_count);
 
 /* Check if engine needs to run but didn't. */
-bool
-engine_need_run(struct engine_node *, uint64_t run_id);
+bool engine_need_run(struct engine_node **nodes, size_t n_count);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -159,8 +182,22 @@  const struct engine_context * engine_get_context(void);
 
 void engine_set_context(const struct engine_context *);
 
-/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
-bool engine_has_run(struct engine_node *node, uint64_t run_id);
+void engine_set_node_state_at(struct engine_node *node,
+                              enum engine_node_state state,
+                              const char *where);
+
+/* Return true if during the last iteration the node's data was updated. */
+bool engine_node_changed(struct engine_node *node);
+
+/* Return true if the engine has run for 'node' in the last iteration. */
+bool engine_has_run(struct engine_node *node);
+
+/* Returns true if during the last engine run we had to abort processing. */
+bool engine_aborted(struct engine_node *node);
+
+/* Set the state of the node and log changes. */
+#define engine_set_node_state(node, state) \
+    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
 
 struct ed_ovsdb_index {
     const char *name;
@@ -187,6 +224,7 @@  void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
     struct engine_node en_##NAME = { \
         .name = NAME_STR, \
         .data = &ed_##NAME, \
+        .state = EN_STALE, \
         .init = en_##NAME##_init, \
         .run = en_##NAME##_run, \
         .cleanup = en_##NAME##_cleanup, \
@@ -201,10 +239,10 @@  en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
     const struct DB_NAME##rec_##TBL_NAME##_table *table = \
         EN_OVSDB_GET(node); \
     if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
-        node->changed = true; \
+        engine_set_node_state(node, EN_UPDATED); \
         return; \
     } \
-    node->changed = false; \
+    engine_set_node_state(node, EN_VALID); \
 } \
 static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
             = NULL; \