diff mbox series

[ovs-dev,v3] inc-proc-eng: move inc-proc code in an isolated strucuture

Message ID f85175b304a03dbcaa51c9a1e91db04b366d5457.1640008244.git.lorenzo.bianconi@redhat.com
State Superseded
Headers show
Series [ovs-dev,v3] inc-proc-eng: move inc-proc code in an isolated strucuture | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Lorenzo Bianconi Dec. 20, 2021, 1:52 p.m. UTC
Remove global state variable and move move inc-proc code in an isolated
strucuture. This is a preliminary patch to add the capability to run
multiple inc-proc engines.

Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
---
Changes since v2:
- cosmetics
- move unixctl commands registration in a dedicated routine
- add list-engines unixctl command

Changes since v1:
- fix unixctl commands for IP engine.
---
 controller/ovn-controller.c |  65 ++++++-----
 lib/inc-proc-eng.c          | 225 +++++++++++++++++++++++-------------
 lib/inc-proc-eng.h          |  42 +++++--
 northd/en-lflow.c           |   2 +-
 northd/en-northd.c          |   2 +-
 northd/inc-proc-northd.c    |  30 ++---
 6 files changed, 230 insertions(+), 136 deletions(-)

Comments

Han Zhou Dec. 20, 2021, 6:03 p.m. UTC | #1
On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
lorenzo.bianconi@redhat.com> wrote:
>
> Remove global state variable and move move inc-proc code in an isolated
> strucuture. This is a preliminary patch to add the capability to run
> multiple inc-proc engines.
>

Thanks Lorenzo! Could you tell more about the use case when multiple
inc-proc engines are required?

Han

> Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
> ---
> Changes since v2:
> - cosmetics
> - move unixctl commands registration in a dedicated routine
> - add list-engines unixctl command
>
> Changes since v1:
> - fix unixctl commands for IP engine.
> ---
>  controller/ovn-controller.c |  65 ++++++-----
>  lib/inc-proc-eng.c          | 225 +++++++++++++++++++++++-------------
>  lib/inc-proc-eng.h          |  42 +++++--
>  northd/en-lflow.c           |   2 +-
>  northd/en-northd.c          |   2 +-
>  northd/inc-proc-northd.c    |  30 ++---
>  6 files changed, 230 insertions(+), 136 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index 5069aedfc..86cb6f769 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -114,6 +114,8 @@ static unixctl_cb_func debug_delay_nb_cfg_report;
>  #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts"
>  #define OVS_STARTUP_TS_NAME "ovn-startup-ts"
>
> +static struct engine *flow_engine;
> +
>  static char *parse_options(int argc, char *argv[]);
>  OVS_NO_RETURN static void usage(void);
>
> @@ -557,7 +559,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct
ovsdb_idl *ovnsb_idl,
>      }
>      if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
>          VLOG_INFO("Resetting southbound database cluster state");
> -        engine_set_force_recompute(true);
> +        engine_set_force_recompute(flow_engine, true);
>          ovsdb_idl_reset_min_index(ovnsb_idl);
>          *reset_ovnsb_idl_min_index = false;
>      }
> @@ -1011,7 +1013,8 @@ en_ofctrl_is_connected_cleanup(void *data
OVS_UNUSED)
>  static void
>  en_ofctrl_is_connected_run(struct engine_node *node, void *data)
>  {
> -    struct controller_engine_ctx *ctrl_ctx =
engine_get_context()->client_ctx;
> +    struct controller_engine_ctx *ctrl_ctx =
> +        engine_get_context(flow_engine)->client_ctx;
>      struct ed_type_ofctrl_is_connected *of_data = data;
>      if (of_data->connected != ofctrl_is_connected()) {
>          of_data->connected = !of_data->connected;
> @@ -1226,10 +1229,11 @@ init_binding_ctx(struct engine_node *node,
>                  engine_get_input("SB_port_binding", node),
>                  "datapath");
>
> -    struct controller_engine_ctx *ctrl_ctx =
engine_get_context()->client_ctx;
> +    struct controller_engine_ctx *ctrl_ctx =
> +        engine_get_context(flow_engine)->client_ctx;
>
> -    b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn;
> -    b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn;
> +    b_ctx_in->ovnsb_idl_txn =
engine_get_context(flow_engine)->ovnsb_idl_txn;
> +    b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn;
>      b_ctx_in->sbrec_datapath_binding_by_key =
sbrec_datapath_binding_by_key;
>      b_ctx_in->sbrec_port_binding_by_datapath =
sbrec_port_binding_by_datapath;
>      b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
> @@ -2387,7 +2391,8 @@ en_lflow_output_run(struct engine_node *node, void
*data)
>          lflow_conj_ids_clear(&fo->conj_ids);
>      }
>
> -    struct controller_engine_ctx *ctrl_ctx =
engine_get_context()->client_ctx;
> +    struct controller_engine_ctx *ctrl_ctx =
> +        engine_get_context(flow_engine)->client_ctx;
>
>      fo->pd.lflow_cache = ctrl_ctx->lflow_cache;
>
> @@ -3040,7 +3045,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl,
struct ovsdb_idl *ovnsb_idl,
>       * full recompute.
>       */
>      if (version_mismatch) {
> -        engine_set_force_recompute(true);
> +        engine_set_force_recompute(flow_engine, true);
>      }
>      version_mismatch = false;
>      return true;
> @@ -3206,6 +3211,8 @@ main(int argc, char *argv[])
>      stopwatch_create(BFD_RUN_STOPWATCH_NAME, SW_MS);
>      stopwatch_create(VIF_PLUG_RUN_STOPWATCH_NAME, SW_MS);
>
> +    engine_init_global();
> +
>      /* Define inc-proc-engine nodes. */
>      ENGINE_NODE_WITH_CLEAR_TRACK_DATA_IS_VALID(ct_zones, "ct_zones");
>      ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
> @@ -3344,7 +3351,7 @@ main(int argc, char *argv[])
>          .sb_idl = ovnsb_idl_loop.idl,
>          .ovs_idl = ovs_idl_loop.idl,
>      };
> -    engine_init(&en_flow_output, &engine_arg);
> +    flow_engine = engine_new(&en_flow_output, &engine_arg,
"flow_engine");
>
>      engine_ovsdb_node_add_index(&en_sb_chassis, "name",
sbrec_chassis_by_name);
>      engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> @@ -3396,7 +3403,7 @@ main(int argc, char *argv[])
>
>      unixctl_command_register("recompute", "[deprecated]", 0, 0,
>                               engine_recompute_cmd,
> -                             NULL);
> +                             flow_engine);
>      unixctl_command_register("lflow-cache/flush", "", 0, 0,
>                               lflow_cache_flush_cmd,
>                               &lflow_output_data->pd);
> @@ -3480,7 +3487,7 @@ main(int argc, char *argv[])
>              goto loop_done;
>          }
>
> -        engine_init_run();
> +        engine_init_run(flow_engine);
>
>          struct ovsdb_idl_txn *ovs_idl_txn =
ovsdb_idl_loop_run(&ovs_idl_loop);
>          unsigned int new_ovs_cond_seqno
> @@ -3488,7 +3495,7 @@ main(int argc, char *argv[])
>          if (new_ovs_cond_seqno != ovs_cond_seqno) {
>              if (!new_ovs_cond_seqno) {
>                  VLOG_INFO("OVS IDL reconnected, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_set_force_recompute(flow_engine, true);
>              }
>              ovs_cond_seqno = new_ovs_cond_seqno;
>          }
> @@ -3506,7 +3513,7 @@ main(int argc, char *argv[])
>          if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
>              if (!new_ovnsb_cond_seqno) {
>                  VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_set_force_recompute(flow_engine, true);
>                  vif_plug_reset_idl_prime_counter();
>              }
>              ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> @@ -3518,7 +3525,7 @@ main(int argc, char *argv[])
>              .client_ctx = &ctrl_engine_ctx
>          };
>
> -        engine_set_context(&eng_ctx);
> +        engine_set_context(flow_engine, &eng_ctx);
>
>          bool northd_version_match =
>              check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl,
> @@ -3584,7 +3591,7 @@ main(int argc, char *argv[])
>                                             &br_int_dp->capabilities :
NULL,
>                                             br_int ? br_int->name :
NULL)) {
>                  VLOG_INFO("OVS feature set changed, force recompute.");
> -                engine_set_force_recompute(true);
> +                engine_set_force_recompute(flow_engine, true);
>              }
>
>              if (br_int) {
> @@ -3619,9 +3626,9 @@ main(int argc, char *argv[])
>                               * this round of engine_run and continue
processing
>                               * acculated changes incrementally later when
>                               * ofctrl_can_put() returns true. */
> -                            engine_run(false);
> +                            engine_run(flow_engine, false);
>                          } else {
> -                            engine_run(true);
> +                            engine_run(flow_engine, true);
>                          }
>                      } else {
>                          /* Even if there's no SB DB transaction
available,
> @@ -3630,7 +3637,7 @@ main(int argc, char *argv[])
>                           * If a recompute is required, the engine will
abort,
>                           * triggerring a full run in the next iteration.
>                           */
> -                        engine_run(false);
> +                        engine_run(flow_engine, false);
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
>                                     time_msec());
> @@ -3775,24 +3782,24 @@ main(int argc, char *argv[])
>
>              }
>
> -            if (!engine_has_run()) {
> -                if (engine_need_run()) {
> +            if (!engine_has_run(flow_engine)) {
> +                if (engine_need_run(flow_engine)) {
>                      VLOG_DBG("engine did not run, force recompute next
time: "
>                               "br_int %p, chassis %p", br_int, chassis);
> -                    engine_set_force_recompute(true);
> +                    engine_set_force_recompute(flow_engine, true);
>                      poll_immediate_wake();
>                  } else {
>                      VLOG_DBG("engine did not run, and it was not needed"
>                               " either: br_int %p, chassis %p",
>                               br_int, chassis);
>                  }
> -            } else if (engine_aborted()) {
> +            } else if (engine_aborted(flow_engine)) {
>                  VLOG_DBG("engine was aborted, force recompute next time:
"
>                           "br_int %p, chassis %p", br_int, chassis);
> -                engine_set_force_recompute(true);
> +                engine_set_force_recompute(flow_engine, true);
>                  poll_immediate_wake();
>              } else {
> -                engine_set_force_recompute(false);
> +                engine_set_force_recompute(flow_engine, false);
>              }
>
>              store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> @@ -3846,7 +3853,7 @@ main(int argc, char *argv[])
>
>          if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
>              VLOG_INFO("OVNSB commit failed, force recompute next time.");
> -            engine_set_force_recompute(true);
> +            engine_set_force_recompute(flow_engine, true);
>          }
>
>          int ovs_txn_status =
ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> @@ -3896,8 +3903,8 @@ loop_done:
>          }
>      }
>
> -    engine_set_context(NULL);
> -    engine_cleanup();
> +    engine_set_context(flow_engine, NULL);
> +    engine_cleanup(flow_engine);
>
>      /* It's time to exit.  Clean up the databases if we are not
restarting */
>      if (!restart) {
> @@ -4152,9 +4159,9 @@ inject_pkt(struct unixctl_conn *conn, int argc
OVS_UNUSED,
>
>  static void
>  engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc
OVS_UNUSED,
> -                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> +                     const char *argv[] OVS_UNUSED, void *arg)
>  {
> -    engine_trigger_recompute();
> +    engine_trigger_recompute(arg);
>      unixctl_command_reply(conn, NULL);
>  }
>
> @@ -4166,7 +4173,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn
OVS_UNUSED,
>      VLOG_INFO("User triggered lflow cache flush.");
>      struct lflow_output_persistent_data *fo_pd = arg_;
>      lflow_cache_flush(fo_pd->lflow_cache);
> -    engine_set_force_recompute(true);
> +    engine_set_force_recompute(flow_engine, true);
>      poll_immediate_wake();
>      unixctl_command_reply(conn, NULL);
>  }
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 2958a55e3..9468b4ce0 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -33,12 +33,7 @@
>
>  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
>
> -static bool engine_force_recompute = false;
> -static bool engine_run_aborted = false;
> -static const struct engine_context *engine_context;
> -
> -static struct engine_node **engine_nodes;
> -static size_t engine_n_nodes;
> +static struct ovs_list engines = OVS_LIST_INITIALIZER(&engines);
>
>  static const char *engine_node_state_name[EN_STATE_MAX] = {
>      [EN_STALE]     = "Stale",
> @@ -52,21 +47,21 @@ engine_recompute(struct engine_node *node, bool
allowed,
>                   const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
>
>  void
> -engine_set_force_recompute(bool val)
> +engine_set_force_recompute(struct engine *e, bool val)
>  {
> -    engine_force_recompute = val;
> +    e->engine_force_recompute = val;
>  }
>
>  const struct engine_context *
> -engine_get_context(void)
> +engine_get_context(struct engine *e)
>  {
> -    return engine_context;
> +    return e->engine_context;
>  }
>
>  void
> -engine_set_context(const struct engine_context *ctx)
> +engine_set_context(struct engine *e, const struct engine_context *ctx)
>  {
> -    engine_context = ctx;
> +    e->engine_context = ctx;
>  }
>
>  /* Builds the topologically sorted 'sorted_nodes' array starting from
> @@ -113,30 +108,53 @@ static void
>  engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
>                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
>  {
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        struct engine_node *node = engine_nodes[i];
> +    const char *target = argc == 2 ? argv[1] : NULL;
> +    struct ds reply = DS_EMPTY_INITIALIZER;
> +    struct engine *e;
> +
> +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> +    LIST_FOR_EACH (e, node, &engines) {
> +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +            struct engine_node *node = e->engine_nodes[i];
>
> -        memset(&node->stats, 0, sizeof node->stats);
> +            if (target && strcmp(target, e->name)) {
> +                continue;
> +            }
> +            memset(&node->stats, 0, sizeof node->stats);
> +            ds_clear(&reply);
> +        }
>      }
> -    unixctl_command_reply(conn, NULL);
> +
> +    unixctl_command_reply(conn, ds_cstr(&reply));
> +    ds_destroy(&reply);
>  }
>
>  static void
> -engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +engine_dump_stats(struct unixctl_conn *conn, int argc,
>                    const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
>  {
> +    const char *target = argc == 2 ? argv[1] : NULL;
>      struct ds dump = DS_EMPTY_INITIALIZER;
> +    struct engine *e;
>
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        struct engine_node *node = engine_nodes[i];
> +    LIST_FOR_EACH (e, node, &engines) {
> +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +            struct engine_node *node = e->engine_nodes[i];
>
> -        ds_put_format(&dump,
> -                      "Node: %s\n"
> -                      "- recompute: %12"PRIu64"\n"
> -                      "- compute:   %12"PRIu64"\n"
> -                      "- abort:     %12"PRIu64"\n",
> -                      node->name, node->stats.recompute,
> -                      node->stats.compute, node->stats.abort);
> +            if (target && strcmp(target, e->name)) {
> +                continue;
> +            }
> +            ds_put_format(&dump,
> +                          "Node: %s\n"
> +                          "- recompute: %12"PRIu64"\n"
> +                          "- compute:   %12"PRIu64"\n"
> +                          "- abort:     %12"PRIu64"\n",
> +                          node->name, node->stats.recompute,
> +                          node->stats.compute, node->stats.abort);
> +        }
> +    }
> +    if (ds_last(&dump) == EOF) {
> +        ds_put_format(&dump, "no %s engine found", target ? target : "");
>      }
>      unixctl_command_reply(conn, ds_cstr(&dump));
>
> @@ -148,48 +166,91 @@ engine_trigger_recompute_cmd(struct unixctl_conn
*conn, int argc OVS_UNUSED,
>                               const char *argv[] OVS_UNUSED,
>                               void *arg OVS_UNUSED)
>  {
> -    engine_trigger_recompute();
> -    unixctl_command_reply(conn, NULL);
> +    const char *target = argc == 2 ? argv[1] : NULL;
> +    struct ds reply = DS_EMPTY_INITIALIZER;
> +    struct engine *e;
> +
> +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> +    LIST_FOR_EACH (e, node, &engines) {
> +        if (target && strcmp(target, e->name)) {
> +            continue;
> +        }
> +        engine_trigger_recompute(e);
> +        ds_clear(&reply);
> +    }
> +
> +    unixctl_command_reply(conn, ds_cstr(&reply));
> +    ds_destroy(&reply);
>  }
>
> -void
> -engine_init(struct engine_node *node, struct engine_arg *arg)
> +static void
> +engine_list_engines(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +                    const char *argv[] OVS_UNUSED,
> +                    void *arg OVS_UNUSED)
>  {
> -    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> +    struct ds reply = DS_EMPTY_INITIALIZER;
> +    struct engine *e;
>
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        if (engine_nodes[i]->init) {
> -            engine_nodes[i]->data =
> -                engine_nodes[i]->init(engine_nodes[i], arg);
> -        } else {
> -            engine_nodes[i]->data = NULL;
> -        }
> +    LIST_FOR_EACH (e, node, &engines) {
> +            ds_put_format(&reply, "%s\n", e->name);
>      }
> +    unixctl_command_reply(conn, ds_cstr(&reply));
> +    ds_destroy(&reply);
> +}
>
> -    unixctl_command_register("inc-engine/show-stats", "", 0, 0,
> +void
> +engine_init_global(void)
> +{
> +    unixctl_command_register("inc-engine/show-stats", "[engine]", 0, 1,
>                               engine_dump_stats, NULL);
> -    unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
> +    unixctl_command_register("inc-engine/clear-stats", "[engine]", 0, 1,
>                               engine_clear_stats, NULL);
> -    unixctl_command_register("inc-engine/recompute", "", 0, 0,
> +    unixctl_command_register("inc-engine/recompute", "[engine]", 0, 1,
>                               engine_trigger_recompute_cmd, NULL);
> +    unixctl_command_register("inc-engine/list-engines", "", 0, 0,
> +                             engine_list_engines, NULL);
> +}
> +
> +struct engine *
> +engine_new(struct engine_node *node, struct engine_arg *arg,
> +           const char *name)
> +{
> +    struct engine *e = xzalloc(sizeof *e);
> +
> +    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
> +    e->name = name;
> +
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        if (e->engine_nodes[i]->init) {
> +            e->engine_nodes[i]->data =
> +                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
> +        } else {
> +            e->engine_nodes[i]->data = NULL;
> +        }
> +        e->engine_nodes[i]->e = e;
> +    }
> +
> +    ovs_list_push_back(&engines, &e->node);
> +
> +    return e;
>  }
>
>  void
> -engine_cleanup(void)
> +engine_cleanup(struct engine *e)
>  {
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        if (engine_nodes[i]->clear_tracked_data) {
> -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        if (e->engine_nodes[i]->clear_tracked_data) {
> +            e->engine_nodes[i]->clear_tracked_data(
> +                    e->engine_nodes[i]->data);
>          }
>
> -        if (engine_nodes[i]->cleanup) {
> -            engine_nodes[i]->cleanup(engine_nodes[i]->data);
> +        if (e->engine_nodes[i]->cleanup) {
> +            e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data);
>          }
> -        free(engine_nodes[i]->data);
> +        free(e->engine_nodes[i]->data);
>      }
> -    free(engine_nodes);
> -    engine_nodes = NULL;
> -    engine_n_nodes = 0;
> +    ovs_list_remove(&e->node);
> +    free(e->engine_nodes);
>  }
>
>  struct engine_node *
> @@ -284,10 +345,10 @@ engine_node_changed(struct engine_node *node)
>  }
>
>  bool
> -engine_has_run(void)
> +engine_has_run(struct engine *e)
>  {
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        if (engine_nodes[i]->state != EN_STALE) {
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        if (e->engine_nodes[i]->state != EN_STALE) {
>              return true;
>          }
>      }
> @@ -295,9 +356,9 @@ engine_has_run(void)
>  }
>
>  bool
> -engine_aborted(void)
> +engine_aborted(struct engine *e)
>  {
> -    return engine_run_aborted;
> +    return e->engine_run_aborted;
>  }
>
>  void *
> @@ -316,14 +377,15 @@ engine_get_internal_data(struct engine_node *node)
>  }
>
>  void
> -engine_init_run(void)
> +engine_init_run(struct engine *e)
>  {
>      VLOG_DBG("Initializing new run");
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        engine_set_node_state(engine_nodes[i], EN_STALE);
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        engine_set_node_state(e->engine_nodes[i], EN_STALE);
>
> -        if (engine_nodes[i]->clear_tracked_data) {
> -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> +        if (e->engine_nodes[i]->clear_tracked_data) {
> +            e->engine_nodes[i]->clear_tracked_data(
> +                    e->engine_nodes[i]->data);
>          }
>      }
>  }
> @@ -397,7 +459,8 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>  }
>
>  static void
> -engine_run_node(struct engine_node *node, bool recompute_allowed)
> +engine_run_node(struct engine *e, struct engine_node *node,
> +                bool recompute_allowed)
>  {
>      if (!node->n_inputs) {
>          /* Run the node handler which might change state. */
> @@ -406,7 +469,7 @@ engine_run_node(struct engine_node *node, bool
recompute_allowed)
>          return;
>      }
>
> -    if (engine_force_recompute) {
> +    if (e->engine_force_recompute) {
>          engine_recompute(node, recompute_allowed, "forced");
>          return;
>      }
> @@ -447,41 +510,41 @@ engine_run_node(struct engine_node *node, bool
recompute_allowed)
>  }
>
>  void
> -engine_run(bool recompute_allowed)
> +engine_run(struct engine *e, bool recompute_allowed)
>  {
>      /* If the last run was aborted skip the incremental run because a
>       * recompute is needed first.
>       */
> -    if (!recompute_allowed && engine_run_aborted) {
> +    if (!recompute_allowed && e->engine_run_aborted) {
>          return;
>      }
>
> -    engine_run_aborted = false;
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        engine_run_node(engine_nodes[i], recompute_allowed);
> +    e->engine_run_aborted = false;
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        engine_run_node(e, e->engine_nodes[i], recompute_allowed);
>
> -        if (engine_nodes[i]->state == EN_ABORTED) {
> -            engine_nodes[i]->stats.abort++;
> -            engine_run_aborted = true;
> +        if (e->engine_nodes[i]->state == EN_ABORTED) {
> +            e->engine_nodes[i]->stats.abort++;
> +            e->engine_run_aborted = true;
>              return;
>          }
>      }
>  }
>
>  bool
> -engine_need_run(void)
> +engine_need_run(struct engine *e)
>  {
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
>          /* Check only leaf nodes for updates. */
> -        if (engine_nodes[i]->n_inputs) {
> +        if (e->engine_nodes[i]->n_inputs) {
>              continue;
>          }
>
> -        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
> -        engine_nodes[i]->stats.recompute++;
> -        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
> -                 engine_node_state_name[engine_nodes[i]->state]);
> -        if (engine_nodes[i]->state == EN_UPDATED) {
> +        e->engine_nodes[i]->run(e->engine_nodes[i],
e->engine_nodes[i]->data);
> +        e->engine_nodes[i]->stats.recompute++;
> +        VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name,
> +                 engine_node_state_name[e->engine_nodes[i]->state]);
> +        if (e->engine_nodes[i]->state == EN_UPDATED) {
>              return true;
>          }
>      }
> @@ -489,9 +552,9 @@ engine_need_run(void)
>  }
>
>  void
> -engine_trigger_recompute(void)
> +engine_trigger_recompute(struct engine *e)
>  {
>      VLOG_INFO("User triggered force recompute.");
> -    engine_set_force_recompute(true);
> +    engine_set_force_recompute(e, true);
>      poll_immediate_wake();
>  }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index 9bfab1f7c..881242138 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -67,6 +67,7 @@
>  #include <stdint.h>
>
>  #include "compiler.h"
> +#include "openvswitch/list.h"
>
>  struct engine_context {
>      struct ovsdb_idl_txn *ovs_idl_txn;
> @@ -122,6 +123,8 @@ struct engine_stats {
>  };
>
>  struct engine_node {
> +    struct engine *e;
> +
>      /* A unique name for each node. */
>      char *name;
>
> @@ -173,30 +176,47 @@ struct engine_node {
>      struct engine_stats stats;
>  };
>
> +struct engine {
> +    struct ovs_list node;
> +
> +    const char *name;
> +
> +    struct engine_node **engine_nodes;
> +    size_t engine_n_nodes;
> +
> +    bool engine_force_recompute;
> +    bool engine_run_aborted;
> +
> +    const struct engine_context *engine_context;
> +};
> +
> +void engine_init_global(void);
> +
>  /* Initialize the data for the engine nodes. It calls each node's
>   * init() method if not NULL passing the user supplied 'arg'.
>   * It should be called before the main loop. */
> -void engine_init(struct engine_node *node, struct engine_arg *arg);
> +struct engine *engine_new(struct engine_node *node, struct engine_arg
*arg,
> +                          const char *name);
>
>  /* Initialize the engine nodes for a new run. It should be called in the
>   * main processing loop before every potential engine_run().
>   */
> -void engine_init_run(void);
> +void engine_init_run(struct engine *e);
>
>  /* Execute the processing, which should be called in the main loop.
>   * Updates the engine node's states accordingly. If 'recompute_allowed'
is
>   * false and a recompute is required by the current engine run then the
engine
>   * aborts.
>   */
> -void engine_run(bool recompute_allowed);
> +void engine_run(struct engine *e, bool recompute_allowed);
>
>  /* Clean up the data for the engine nodes. It calls each node's
>   * cleanup() method if not NULL. It should be called before the program
>   * terminates. */
> -void engine_cleanup(void);
> +void engine_cleanup(struct engine *e);
>
>  /* Check if engine needs to run but didn't. */
> -bool engine_need_run(void);
> +bool engine_need_run(struct engine *e);
>
>  /* Get the input node with <name> for <node> */
>  struct engine_node * engine_get_input(const char *input_name,
> @@ -216,7 +236,7 @@ void engine_add_input(struct engine_node *node,
struct engine_node *input,
>   * in circumstances when we are not sure there is change or not, or
>   * when there is change but the engine couldn't be executed in that
>   * iteration, and the change can't be tracked across iterations */
> -void engine_set_force_recompute(bool val);
> +void engine_set_force_recompute(struct engine *e, bool val);
>
>  /* Return the current engine_context. The values in the context can be
NULL
>   * if the engine is run with allow_recompute == false in the current
> @@ -224,9 +244,9 @@ void engine_set_force_recompute(bool val);
>   * Therefore, it is the responsibility of the caller to check the context
>   * values when called from change handlers.
>   */
> -const struct engine_context *engine_get_context(void);
> +const struct engine_context *engine_get_context(struct engine *e);
>
> -void engine_set_context(const struct engine_context *);
> +void engine_set_context(struct engine *e, const struct engine_context *);
>
>  void engine_set_node_state_at(struct engine_node *node,
>                                enum engine_node_state state,
> @@ -236,10 +256,10 @@ void engine_set_node_state_at(struct engine_node
*node,
>  bool engine_node_changed(struct engine_node *node);
>
>  /* Return true if the engine has run in the last iteration. */
> -bool engine_has_run(void);
> +bool engine_has_run(struct engine *e);
>
>  /* Returns true if during the last engine run we had to abort
processing. */
> -bool engine_aborted(void);
> +bool engine_aborted(struct engine *e);
>
>  /* Return a pointer to node data accessible for users outside the
processing
>   * engine. If the node data is not valid (e.g., last engine_run() failed
or
> @@ -265,7 +285,7 @@ void *engine_get_internal_data(struct engine_node
*node);
>      engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
>
>  /* Trigger a full recompute. */
> -void engine_trigger_recompute(void);
> +void engine_trigger_recompute(struct engine *e);
>
>  struct ed_ovsdb_index {
>      const char *name;
> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> index ffbdaf4e8..5451e0551 100644
> --- a/northd/en-lflow.c
> +++ b/northd/en-lflow.c
> @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_lflow);
>
>  void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED)
>  {
> -    const struct engine_context *eng_ctx = engine_get_context();
> +    const struct engine_context *eng_ctx = engine_get_context(node->e);
>
>      struct lflow_input lflow_input;
>
> diff --git a/northd/en-northd.c b/northd/en-northd.c
> index 79da7e1c4..064f9d93a 100644
> --- a/northd/en-northd.c
> +++ b/northd/en-northd.c
> @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_northd);
>
>  void en_northd_run(struct engine_node *node, void *data)
>  {
> -    const struct engine_context *eng_ctx = engine_get_context();
> +    const struct engine_context *eng_ctx = engine_get_context(node->e);
>
>      struct northd_input input_data;
>
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index af55221e3..049fe226a 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -33,6 +33,8 @@
>
>  VLOG_DEFINE_THIS_MODULE(inc_proc_northd);
>
> +static struct engine *flow_engine;
> +
>  #define NB_NODES \
>      NB_NODE(nb_global, "nb_global") \
>      NB_NODE(copp, "copp") \
> @@ -150,6 +152,8 @@ static ENGINE_NODE(lflow, "lflow");
>  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>                            struct ovsdb_idl_loop *sb)
>  {
> +    engine_init_global();
> +
>      /* Define relationships between nodes where first argument is
dependent
>       * on the second argument */
>      engine_add_input(&en_northd, &en_nb_nb_global, NULL);
> @@ -229,7 +233,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      struct ovsdb_idl_index *sbrec_chassis_by_hostname =
>          chassis_hostname_index_create(sb->idl);
>
> -    engine_init(&en_lflow, &engine_arg);
> +    flow_engine = engine_new(&en_lflow, &engine_arg, "flow_engine");
>
>      engine_ovsdb_node_add_index(&en_sb_chassis,
>                                  "sbrec_chassis_by_name",
> @@ -251,14 +255,14 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
>                           struct ovsdb_idl_txn *ovnsb_txn,
>                           bool recompute) {
> -    engine_init_run();
> +    engine_init_run(flow_engine);
>
>      /* Force a full recompute if instructed to, for example, after a
NB/SB
>       * reconnect event.  However, make sure we don't overwrite an
existing
>       * force-recompute request if 'recompute' is false.
>       */
>      if (recompute) {
> -        engine_set_force_recompute(recompute);
> +        engine_set_force_recompute(flow_engine, recompute);
>      }
>
>      struct engine_context eng_ctx = {
> @@ -266,31 +270,31 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
*ovnnb_txn,
>          .ovnsb_idl_txn = ovnsb_txn,
>      };
>
> -    engine_set_context(&eng_ctx);
> +    engine_set_context(flow_engine, &eng_ctx);
>
>      if (ovnnb_txn && ovnsb_txn) {
> -        engine_run(true);
> +        engine_run(flow_engine, true);
>      }
>
> -    if (!engine_has_run()) {
> -        if (engine_need_run()) {
> +    if (!engine_has_run(flow_engine)) {
> +        if (engine_need_run(flow_engine)) {
>              VLOG_DBG("engine did not run, force recompute next time.");
> -            engine_set_force_recompute(true);
> +            engine_set_force_recompute(flow_engine, true);
>              poll_immediate_wake();
>          } else {
>              VLOG_DBG("engine did not run, and it was not needed");
>          }
> -    } else if (engine_aborted()) {
> +    } else if (engine_aborted(flow_engine)) {
>          VLOG_DBG("engine was aborted, force recompute next time.");
> -        engine_set_force_recompute(true);
> +        engine_set_force_recompute(flow_engine, true);
>          poll_immediate_wake();
>      } else {
> -        engine_set_force_recompute(false);
> +        engine_set_force_recompute(flow_engine, false);
>      }
>  }
>
>  void inc_proc_northd_cleanup(void)
>  {
> -    engine_cleanup();
> -    engine_set_context(NULL);
> +    engine_set_context(flow_engine, NULL);
> +    engine_cleanup(flow_engine);
>  }
> --
> 2.33.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Lorenzo Bianconi Dec. 20, 2021, 6:31 p.m. UTC | #2
> On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> lorenzo.bianconi@redhat.com> wrote:
> >
> > Remove global state variable and move move inc-proc code in an isolated
> > strucuture. This is a preliminary patch to add the capability to run
> > multiple inc-proc engines.
> >
> 
> Thanks Lorenzo! Could you tell more about the use case when multiple
> inc-proc engines are required?

Hi Han,

I will rely on this patch to add an incremental processing engine for ovn
meters since in the current codebase ovs meters are updated when an ovn meter
is added or deleted but when it is updated.

Regards,
Lorenzo

> 
> Han
> 
> > Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
> > ---
> > Changes since v2:
> > - cosmetics
> > - move unixctl commands registration in a dedicated routine
> > - add list-engines unixctl command
> >
> > Changes since v1:
> > - fix unixctl commands for IP engine.
> > ---
> >  controller/ovn-controller.c |  65 ++++++-----
> >  lib/inc-proc-eng.c          | 225 +++++++++++++++++++++++-------------
> >  lib/inc-proc-eng.h          |  42 +++++--
> >  northd/en-lflow.c           |   2 +-
> >  northd/en-northd.c          |   2 +-
> >  northd/inc-proc-northd.c    |  30 ++---
> >  6 files changed, 230 insertions(+), 136 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index 5069aedfc..86cb6f769 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -114,6 +114,8 @@ static unixctl_cb_func debug_delay_nb_cfg_report;
> >  #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts"
> >  #define OVS_STARTUP_TS_NAME "ovn-startup-ts"
> >
> > +static struct engine *flow_engine;
> > +
> >  static char *parse_options(int argc, char *argv[]);
> >  OVS_NO_RETURN static void usage(void);
> >
> > @@ -557,7 +559,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct
> ovsdb_idl *ovnsb_idl,
> >      }
> >      if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
> >          VLOG_INFO("Resetting southbound database cluster state");
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >          ovsdb_idl_reset_min_index(ovnsb_idl);
> >          *reset_ovnsb_idl_min_index = false;
> >      }
> > @@ -1011,7 +1013,8 @@ en_ofctrl_is_connected_cleanup(void *data
> OVS_UNUSED)
> >  static void
> >  en_ofctrl_is_connected_run(struct engine_node *node, void *data)
> >  {
> > -    struct controller_engine_ctx *ctrl_ctx =
> engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> >      struct ed_type_ofctrl_is_connected *of_data = data;
> >      if (of_data->connected != ofctrl_is_connected()) {
> >          of_data->connected = !of_data->connected;
> > @@ -1226,10 +1229,11 @@ init_binding_ctx(struct engine_node *node,
> >                  engine_get_input("SB_port_binding", node),
> >                  "datapath");
> >
> > -    struct controller_engine_ctx *ctrl_ctx =
> engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> >
> > -    b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn;
> > -    b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn;
> > +    b_ctx_in->ovnsb_idl_txn =
> engine_get_context(flow_engine)->ovnsb_idl_txn;
> > +    b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn;
> >      b_ctx_in->sbrec_datapath_binding_by_key =
> sbrec_datapath_binding_by_key;
> >      b_ctx_in->sbrec_port_binding_by_datapath =
> sbrec_port_binding_by_datapath;
> >      b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
> > @@ -2387,7 +2391,8 @@ en_lflow_output_run(struct engine_node *node, void
> *data)
> >          lflow_conj_ids_clear(&fo->conj_ids);
> >      }
> >
> > -    struct controller_engine_ctx *ctrl_ctx =
> engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> >
> >      fo->pd.lflow_cache = ctrl_ctx->lflow_cache;
> >
> > @@ -3040,7 +3045,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl,
> struct ovsdb_idl *ovnsb_idl,
> >       * full recompute.
> >       */
> >      if (version_mismatch) {
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >      }
> >      version_mismatch = false;
> >      return true;
> > @@ -3206,6 +3211,8 @@ main(int argc, char *argv[])
> >      stopwatch_create(BFD_RUN_STOPWATCH_NAME, SW_MS);
> >      stopwatch_create(VIF_PLUG_RUN_STOPWATCH_NAME, SW_MS);
> >
> > +    engine_init_global();
> > +
> >      /* Define inc-proc-engine nodes. */
> >      ENGINE_NODE_WITH_CLEAR_TRACK_DATA_IS_VALID(ct_zones, "ct_zones");
> >      ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
> > @@ -3344,7 +3351,7 @@ main(int argc, char *argv[])
> >          .sb_idl = ovnsb_idl_loop.idl,
> >          .ovs_idl = ovs_idl_loop.idl,
> >      };
> > -    engine_init(&en_flow_output, &engine_arg);
> > +    flow_engine = engine_new(&en_flow_output, &engine_arg,
> "flow_engine");
> >
> >      engine_ovsdb_node_add_index(&en_sb_chassis, "name",
> sbrec_chassis_by_name);
> >      engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> > @@ -3396,7 +3403,7 @@ main(int argc, char *argv[])
> >
> >      unixctl_command_register("recompute", "[deprecated]", 0, 0,
> >                               engine_recompute_cmd,
> > -                             NULL);
> > +                             flow_engine);
> >      unixctl_command_register("lflow-cache/flush", "", 0, 0,
> >                               lflow_cache_flush_cmd,
> >                               &lflow_output_data->pd);
> > @@ -3480,7 +3487,7 @@ main(int argc, char *argv[])
> >              goto loop_done;
> >          }
> >
> > -        engine_init_run();
> > +        engine_init_run(flow_engine);
> >
> >          struct ovsdb_idl_txn *ovs_idl_txn =
> ovsdb_idl_loop_run(&ovs_idl_loop);
> >          unsigned int new_ovs_cond_seqno
> > @@ -3488,7 +3495,7 @@ main(int argc, char *argv[])
> >          if (new_ovs_cond_seqno != ovs_cond_seqno) {
> >              if (!new_ovs_cond_seqno) {
> >                  VLOG_INFO("OVS IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >              }
> >              ovs_cond_seqno = new_ovs_cond_seqno;
> >          }
> > @@ -3506,7 +3513,7 @@ main(int argc, char *argv[])
> >          if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
> >              if (!new_ovnsb_cond_seqno) {
> >                  VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >                  vif_plug_reset_idl_prime_counter();
> >              }
> >              ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> > @@ -3518,7 +3525,7 @@ main(int argc, char *argv[])
> >              .client_ctx = &ctrl_engine_ctx
> >          };
> >
> > -        engine_set_context(&eng_ctx);
> > +        engine_set_context(flow_engine, &eng_ctx);
> >
> >          bool northd_version_match =
> >              check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl,
> > @@ -3584,7 +3591,7 @@ main(int argc, char *argv[])
> >                                             &br_int_dp->capabilities :
> NULL,
> >                                             br_int ? br_int->name :
> NULL)) {
> >                  VLOG_INFO("OVS feature set changed, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >              }
> >
> >              if (br_int) {
> > @@ -3619,9 +3626,9 @@ main(int argc, char *argv[])
> >                               * this round of engine_run and continue
> processing
> >                               * acculated changes incrementally later when
> >                               * ofctrl_can_put() returns true. */
> > -                            engine_run(false);
> > +                            engine_run(flow_engine, false);
> >                          } else {
> > -                            engine_run(true);
> > +                            engine_run(flow_engine, true);
> >                          }
> >                      } else {
> >                          /* Even if there's no SB DB transaction
> available,
> > @@ -3630,7 +3637,7 @@ main(int argc, char *argv[])
> >                           * If a recompute is required, the engine will
> abort,
> >                           * triggerring a full run in the next iteration.
> >                           */
> > -                        engine_run(false);
> > +                        engine_run(flow_engine, false);
> >                      }
> >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> >                                     time_msec());
> > @@ -3775,24 +3782,24 @@ main(int argc, char *argv[])
> >
> >              }
> >
> > -            if (!engine_has_run()) {
> > -                if (engine_need_run()) {
> > +            if (!engine_has_run(flow_engine)) {
> > +                if (engine_need_run(flow_engine)) {
> >                      VLOG_DBG("engine did not run, force recompute next
> time: "
> >                               "br_int %p, chassis %p", br_int, chassis);
> > -                    engine_set_force_recompute(true);
> > +                    engine_set_force_recompute(flow_engine, true);
> >                      poll_immediate_wake();
> >                  } else {
> >                      VLOG_DBG("engine did not run, and it was not needed"
> >                               " either: br_int %p, chassis %p",
> >                               br_int, chassis);
> >                  }
> > -            } else if (engine_aborted()) {
> > +            } else if (engine_aborted(flow_engine)) {
> >                  VLOG_DBG("engine was aborted, force recompute next time:
> "
> >                           "br_int %p, chassis %p", br_int, chassis);
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >                  poll_immediate_wake();
> >              } else {
> > -                engine_set_force_recompute(false);
> > +                engine_set_force_recompute(flow_engine, false);
> >              }
> >
> >              store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> > @@ -3846,7 +3853,7 @@ main(int argc, char *argv[])
> >
> >          if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
> >              VLOG_INFO("OVNSB commit failed, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_set_force_recompute(flow_engine, true);
> >          }
> >
> >          int ovs_txn_status =
> ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> > @@ -3896,8 +3903,8 @@ loop_done:
> >          }
> >      }
> >
> > -    engine_set_context(NULL);
> > -    engine_cleanup();
> > +    engine_set_context(flow_engine, NULL);
> > +    engine_cleanup(flow_engine);
> >
> >      /* It's time to exit.  Clean up the databases if we are not
> restarting */
> >      if (!restart) {
> > @@ -4152,9 +4159,9 @@ inject_pkt(struct unixctl_conn *conn, int argc
> OVS_UNUSED,
> >
> >  static void
> >  engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc
> OVS_UNUSED,
> > -                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> > +                     const char *argv[] OVS_UNUSED, void *arg)
> >  {
> > -    engine_trigger_recompute();
> > +    engine_trigger_recompute(arg);
> >      unixctl_command_reply(conn, NULL);
> >  }
> >
> > @@ -4166,7 +4173,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn
> OVS_UNUSED,
> >      VLOG_INFO("User triggered lflow cache flush.");
> >      struct lflow_output_persistent_data *fo_pd = arg_;
> >      lflow_cache_flush(fo_pd->lflow_cache);
> > -    engine_set_force_recompute(true);
> > +    engine_set_force_recompute(flow_engine, true);
> >      poll_immediate_wake();
> >      unixctl_command_reply(conn, NULL);
> >  }
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 2958a55e3..9468b4ce0 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -33,12 +33,7 @@
> >
> >  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
> >
> > -static bool engine_force_recompute = false;
> > -static bool engine_run_aborted = false;
> > -static const struct engine_context *engine_context;
> > -
> > -static struct engine_node **engine_nodes;
> > -static size_t engine_n_nodes;
> > +static struct ovs_list engines = OVS_LIST_INITIALIZER(&engines);
> >
> >  static const char *engine_node_state_name[EN_STATE_MAX] = {
> >      [EN_STALE]     = "Stale",
> > @@ -52,21 +47,21 @@ engine_recompute(struct engine_node *node, bool
> allowed,
> >                   const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
> >
> >  void
> > -engine_set_force_recompute(bool val)
> > +engine_set_force_recompute(struct engine *e, bool val)
> >  {
> > -    engine_force_recompute = val;
> > +    e->engine_force_recompute = val;
> >  }
> >
> >  const struct engine_context *
> > -engine_get_context(void)
> > +engine_get_context(struct engine *e)
> >  {
> > -    return engine_context;
> > +    return e->engine_context;
> >  }
> >
> >  void
> > -engine_set_context(const struct engine_context *ctx)
> > +engine_set_context(struct engine *e, const struct engine_context *ctx)
> >  {
> > -    engine_context = ctx;
> > +    e->engine_context = ctx;
> >  }
> >
> >  /* Builds the topologically sorted 'sorted_nodes' array starting from
> > @@ -113,30 +108,53 @@ static void
> >  engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
> >                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> >  {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        struct engine_node *node = engine_nodes[i];
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > +
> > +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +            struct engine_node *node = e->engine_nodes[i];
> >
> > -        memset(&node->stats, 0, sizeof node->stats);
> > +            if (target && strcmp(target, e->name)) {
> > +                continue;
> > +            }
> > +            memset(&node->stats, 0, sizeof node->stats);
> > +            ds_clear(&reply);
> > +        }
> >      }
> > -    unixctl_command_reply(conn, NULL);
> > +
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> >  }
> >
> >  static void
> > -engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
> > +engine_dump_stats(struct unixctl_conn *conn, int argc,
> >                    const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> >  {
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> >      struct ds dump = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> >
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        struct engine_node *node = engine_nodes[i];
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +            struct engine_node *node = e->engine_nodes[i];
> >
> > -        ds_put_format(&dump,
> > -                      "Node: %s\n"
> > -                      "- recompute: %12"PRIu64"\n"
> > -                      "- compute:   %12"PRIu64"\n"
> > -                      "- abort:     %12"PRIu64"\n",
> > -                      node->name, node->stats.recompute,
> > -                      node->stats.compute, node->stats.abort);
> > +            if (target && strcmp(target, e->name)) {
> > +                continue;
> > +            }
> > +            ds_put_format(&dump,
> > +                          "Node: %s\n"
> > +                          "- recompute: %12"PRIu64"\n"
> > +                          "- compute:   %12"PRIu64"\n"
> > +                          "- abort:     %12"PRIu64"\n",
> > +                          node->name, node->stats.recompute,
> > +                          node->stats.compute, node->stats.abort);
> > +        }
> > +    }
> > +    if (ds_last(&dump) == EOF) {
> > +        ds_put_format(&dump, "no %s engine found", target ? target : "");
> >      }
> >      unixctl_command_reply(conn, ds_cstr(&dump));
> >
> > @@ -148,48 +166,91 @@ engine_trigger_recompute_cmd(struct unixctl_conn
> *conn, int argc OVS_UNUSED,
> >                               const char *argv[] OVS_UNUSED,
> >                               void *arg OVS_UNUSED)
> >  {
> > -    engine_trigger_recompute();
> > -    unixctl_command_reply(conn, NULL);
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > +
> > +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        if (target && strcmp(target, e->name)) {
> > +            continue;
> > +        }
> > +        engine_trigger_recompute(e);
> > +        ds_clear(&reply);
> > +    }
> > +
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> >  }
> >
> > -void
> > -engine_init(struct engine_node *node, struct engine_arg *arg)
> > +static void
> > +engine_list_engines(struct unixctl_conn *conn, int argc OVS_UNUSED,
> > +                    const char *argv[] OVS_UNUSED,
> > +                    void *arg OVS_UNUSED)
> >  {
> > -    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> >
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->init) {
> > -            engine_nodes[i]->data =
> > -                engine_nodes[i]->init(engine_nodes[i], arg);
> > -        } else {
> > -            engine_nodes[i]->data = NULL;
> > -        }
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +            ds_put_format(&reply, "%s\n", e->name);
> >      }
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> > +}
> >
> > -    unixctl_command_register("inc-engine/show-stats", "", 0, 0,
> > +void
> > +engine_init_global(void)
> > +{
> > +    unixctl_command_register("inc-engine/show-stats", "[engine]", 0, 1,
> >                               engine_dump_stats, NULL);
> > -    unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
> > +    unixctl_command_register("inc-engine/clear-stats", "[engine]", 0, 1,
> >                               engine_clear_stats, NULL);
> > -    unixctl_command_register("inc-engine/recompute", "", 0, 0,
> > +    unixctl_command_register("inc-engine/recompute", "[engine]", 0, 1,
> >                               engine_trigger_recompute_cmd, NULL);
> > +    unixctl_command_register("inc-engine/list-engines", "", 0, 0,
> > +                             engine_list_engines, NULL);
> > +}
> > +
> > +struct engine *
> > +engine_new(struct engine_node *node, struct engine_arg *arg,
> > +           const char *name)
> > +{
> > +    struct engine *e = xzalloc(sizeof *e);
> > +
> > +    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
> > +    e->name = name;
> > +
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->init) {
> > +            e->engine_nodes[i]->data =
> > +                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
> > +        } else {
> > +            e->engine_nodes[i]->data = NULL;
> > +        }
> > +        e->engine_nodes[i]->e = e;
> > +    }
> > +
> > +    ovs_list_push_back(&engines, &e->node);
> > +
> > +    return e;
> >  }
> >
> >  void
> > -engine_cleanup(void)
> > +engine_cleanup(struct engine *e)
> >  {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->clear_tracked_data) {
> > -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->clear_tracked_data) {
> > +            e->engine_nodes[i]->clear_tracked_data(
> > +                    e->engine_nodes[i]->data);
> >          }
> >
> > -        if (engine_nodes[i]->cleanup) {
> > -            engine_nodes[i]->cleanup(engine_nodes[i]->data);
> > +        if (e->engine_nodes[i]->cleanup) {
> > +            e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data);
> >          }
> > -        free(engine_nodes[i]->data);
> > +        free(e->engine_nodes[i]->data);
> >      }
> > -    free(engine_nodes);
> > -    engine_nodes = NULL;
> > -    engine_n_nodes = 0;
> > +    ovs_list_remove(&e->node);
> > +    free(e->engine_nodes);
> >  }
> >
> >  struct engine_node *
> > @@ -284,10 +345,10 @@ engine_node_changed(struct engine_node *node)
> >  }
> >
> >  bool
> > -engine_has_run(void)
> > +engine_has_run(struct engine *e)
> >  {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->state != EN_STALE) {
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->state != EN_STALE) {
> >              return true;
> >          }
> >      }
> > @@ -295,9 +356,9 @@ engine_has_run(void)
> >  }
> >
> >  bool
> > -engine_aborted(void)
> > +engine_aborted(struct engine *e)
> >  {
> > -    return engine_run_aborted;
> > +    return e->engine_run_aborted;
> >  }
> >
> >  void *
> > @@ -316,14 +377,15 @@ engine_get_internal_data(struct engine_node *node)
> >  }
> >
> >  void
> > -engine_init_run(void)
> > +engine_init_run(struct engine *e)
> >  {
> >      VLOG_DBG("Initializing new run");
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        engine_set_node_state(engine_nodes[i], EN_STALE);
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        engine_set_node_state(e->engine_nodes[i], EN_STALE);
> >
> > -        if (engine_nodes[i]->clear_tracked_data) {
> > -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> > +        if (e->engine_nodes[i]->clear_tracked_data) {
> > +            e->engine_nodes[i]->clear_tracked_data(
> > +                    e->engine_nodes[i]->data);
> >          }
> >      }
> >  }
> > @@ -397,7 +459,8 @@ engine_compute(struct engine_node *node, bool
> recompute_allowed)
> >  }
> >
> >  static void
> > -engine_run_node(struct engine_node *node, bool recompute_allowed)
> > +engine_run_node(struct engine *e, struct engine_node *node,
> > +                bool recompute_allowed)
> >  {
> >      if (!node->n_inputs) {
> >          /* Run the node handler which might change state. */
> > @@ -406,7 +469,7 @@ engine_run_node(struct engine_node *node, bool
> recompute_allowed)
> >          return;
> >      }
> >
> > -    if (engine_force_recompute) {
> > +    if (e->engine_force_recompute) {
> >          engine_recompute(node, recompute_allowed, "forced");
> >          return;
> >      }
> > @@ -447,41 +510,41 @@ engine_run_node(struct engine_node *node, bool
> recompute_allowed)
> >  }
> >
> >  void
> > -engine_run(bool recompute_allowed)
> > +engine_run(struct engine *e, bool recompute_allowed)
> >  {
> >      /* If the last run was aborted skip the incremental run because a
> >       * recompute is needed first.
> >       */
> > -    if (!recompute_allowed && engine_run_aborted) {
> > +    if (!recompute_allowed && e->engine_run_aborted) {
> >          return;
> >      }
> >
> > -    engine_run_aborted = false;
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        engine_run_node(engine_nodes[i], recompute_allowed);
> > +    e->engine_run_aborted = false;
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        engine_run_node(e, e->engine_nodes[i], recompute_allowed);
> >
> > -        if (engine_nodes[i]->state == EN_ABORTED) {
> > -            engine_nodes[i]->stats.abort++;
> > -            engine_run_aborted = true;
> > +        if (e->engine_nodes[i]->state == EN_ABORTED) {
> > +            e->engine_nodes[i]->stats.abort++;
> > +            e->engine_run_aborted = true;
> >              return;
> >          }
> >      }
> >  }
> >
> >  bool
> > -engine_need_run(void)
> > +engine_need_run(struct engine *e)
> >  {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> >          /* Check only leaf nodes for updates. */
> > -        if (engine_nodes[i]->n_inputs) {
> > +        if (e->engine_nodes[i]->n_inputs) {
> >              continue;
> >          }
> >
> > -        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
> > -        engine_nodes[i]->stats.recompute++;
> > -        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
> > -                 engine_node_state_name[engine_nodes[i]->state]);
> > -        if (engine_nodes[i]->state == EN_UPDATED) {
> > +        e->engine_nodes[i]->run(e->engine_nodes[i],
> e->engine_nodes[i]->data);
> > +        e->engine_nodes[i]->stats.recompute++;
> > +        VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name,
> > +                 engine_node_state_name[e->engine_nodes[i]->state]);
> > +        if (e->engine_nodes[i]->state == EN_UPDATED) {
> >              return true;
> >          }
> >      }
> > @@ -489,9 +552,9 @@ engine_need_run(void)
> >  }
> >
> >  void
> > -engine_trigger_recompute(void)
> > +engine_trigger_recompute(struct engine *e)
> >  {
> >      VLOG_INFO("User triggered force recompute.");
> > -    engine_set_force_recompute(true);
> > +    engine_set_force_recompute(e, true);
> >      poll_immediate_wake();
> >  }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index 9bfab1f7c..881242138 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -67,6 +67,7 @@
> >  #include <stdint.h>
> >
> >  #include "compiler.h"
> > +#include "openvswitch/list.h"
> >
> >  struct engine_context {
> >      struct ovsdb_idl_txn *ovs_idl_txn;
> > @@ -122,6 +123,8 @@ struct engine_stats {
> >  };
> >
> >  struct engine_node {
> > +    struct engine *e;
> > +
> >      /* A unique name for each node. */
> >      char *name;
> >
> > @@ -173,30 +176,47 @@ struct engine_node {
> >      struct engine_stats stats;
> >  };
> >
> > +struct engine {
> > +    struct ovs_list node;
> > +
> > +    const char *name;
> > +
> > +    struct engine_node **engine_nodes;
> > +    size_t engine_n_nodes;
> > +
> > +    bool engine_force_recompute;
> > +    bool engine_run_aborted;
> > +
> > +    const struct engine_context *engine_context;
> > +};
> > +
> > +void engine_init_global(void);
> > +
> >  /* Initialize the data for the engine nodes. It calls each node's
> >   * init() method if not NULL passing the user supplied 'arg'.
> >   * It should be called before the main loop. */
> > -void engine_init(struct engine_node *node, struct engine_arg *arg);
> > +struct engine *engine_new(struct engine_node *node, struct engine_arg
> *arg,
> > +                          const char *name);
> >
> >  /* Initialize the engine nodes for a new run. It should be called in the
> >   * main processing loop before every potential engine_run().
> >   */
> > -void engine_init_run(void);
> > +void engine_init_run(struct engine *e);
> >
> >  /* Execute the processing, which should be called in the main loop.
> >   * Updates the engine node's states accordingly. If 'recompute_allowed'
> is
> >   * false and a recompute is required by the current engine run then the
> engine
> >   * aborts.
> >   */
> > -void engine_run(bool recompute_allowed);
> > +void engine_run(struct engine *e, bool recompute_allowed);
> >
> >  /* Clean up the data for the engine nodes. It calls each node's
> >   * cleanup() method if not NULL. It should be called before the program
> >   * terminates. */
> > -void engine_cleanup(void);
> > +void engine_cleanup(struct engine *e);
> >
> >  /* Check if engine needs to run but didn't. */
> > -bool engine_need_run(void);
> > +bool engine_need_run(struct engine *e);
> >
> >  /* Get the input node with <name> for <node> */
> >  struct engine_node * engine_get_input(const char *input_name,
> > @@ -216,7 +236,7 @@ void engine_add_input(struct engine_node *node,
> struct engine_node *input,
> >   * in circumstances when we are not sure there is change or not, or
> >   * when there is change but the engine couldn't be executed in that
> >   * iteration, and the change can't be tracked across iterations */
> > -void engine_set_force_recompute(bool val);
> > +void engine_set_force_recompute(struct engine *e, bool val);
> >
> >  /* Return the current engine_context. The values in the context can be
> NULL
> >   * if the engine is run with allow_recompute == false in the current
> > @@ -224,9 +244,9 @@ void engine_set_force_recompute(bool val);
> >   * Therefore, it is the responsibility of the caller to check the context
> >   * values when called from change handlers.
> >   */
> > -const struct engine_context *engine_get_context(void);
> > +const struct engine_context *engine_get_context(struct engine *e);
> >
> > -void engine_set_context(const struct engine_context *);
> > +void engine_set_context(struct engine *e, const struct engine_context *);
> >
> >  void engine_set_node_state_at(struct engine_node *node,
> >                                enum engine_node_state state,
> > @@ -236,10 +256,10 @@ void engine_set_node_state_at(struct engine_node
> *node,
> >  bool engine_node_changed(struct engine_node *node);
> >
> >  /* Return true if the engine has run in the last iteration. */
> > -bool engine_has_run(void);
> > +bool engine_has_run(struct engine *e);
> >
> >  /* Returns true if during the last engine run we had to abort
> processing. */
> > -bool engine_aborted(void);
> > +bool engine_aborted(struct engine *e);
> >
> >  /* Return a pointer to node data accessible for users outside the
> processing
> >   * engine. If the node data is not valid (e.g., last engine_run() failed
> or
> > @@ -265,7 +285,7 @@ void *engine_get_internal_data(struct engine_node
> *node);
> >      engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> >
> >  /* Trigger a full recompute. */
> > -void engine_trigger_recompute(void);
> > +void engine_trigger_recompute(struct engine *e);
> >
> >  struct ed_ovsdb_index {
> >      const char *name;
> > diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> > index ffbdaf4e8..5451e0551 100644
> > --- a/northd/en-lflow.c
> > +++ b/northd/en-lflow.c
> > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_lflow);
> >
> >  void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED)
> >  {
> > -    const struct engine_context *eng_ctx = engine_get_context();
> > +    const struct engine_context *eng_ctx = engine_get_context(node->e);
> >
> >      struct lflow_input lflow_input;
> >
> > diff --git a/northd/en-northd.c b/northd/en-northd.c
> > index 79da7e1c4..064f9d93a 100644
> > --- a/northd/en-northd.c
> > +++ b/northd/en-northd.c
> > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_northd);
> >
> >  void en_northd_run(struct engine_node *node, void *data)
> >  {
> > -    const struct engine_context *eng_ctx = engine_get_context();
> > +    const struct engine_context *eng_ctx = engine_get_context(node->e);
> >
> >      struct northd_input input_data;
> >
> > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> > index af55221e3..049fe226a 100644
> > --- a/northd/inc-proc-northd.c
> > +++ b/northd/inc-proc-northd.c
> > @@ -33,6 +33,8 @@
> >
> >  VLOG_DEFINE_THIS_MODULE(inc_proc_northd);
> >
> > +static struct engine *flow_engine;
> > +
> >  #define NB_NODES \
> >      NB_NODE(nb_global, "nb_global") \
> >      NB_NODE(copp, "copp") \
> > @@ -150,6 +152,8 @@ static ENGINE_NODE(lflow, "lflow");
> >  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >                            struct ovsdb_idl_loop *sb)
> >  {
> > +    engine_init_global();
> > +
> >      /* Define relationships between nodes where first argument is
> dependent
> >       * on the second argument */
> >      engine_add_input(&en_northd, &en_nb_nb_global, NULL);
> > @@ -229,7 +233,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >      struct ovsdb_idl_index *sbrec_chassis_by_hostname =
> >          chassis_hostname_index_create(sb->idl);
> >
> > -    engine_init(&en_lflow, &engine_arg);
> > +    flow_engine = engine_new(&en_lflow, &engine_arg, "flow_engine");
> >
> >      engine_ovsdb_node_add_index(&en_sb_chassis,
> >                                  "sbrec_chassis_by_name",
> > @@ -251,14 +255,14 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
> >                           struct ovsdb_idl_txn *ovnsb_txn,
> >                           bool recompute) {
> > -    engine_init_run();
> > +    engine_init_run(flow_engine);
> >
> >      /* Force a full recompute if instructed to, for example, after a
> NB/SB
> >       * reconnect event.  However, make sure we don't overwrite an
> existing
> >       * force-recompute request if 'recompute' is false.
> >       */
> >      if (recompute) {
> > -        engine_set_force_recompute(recompute);
> > +        engine_set_force_recompute(flow_engine, recompute);
> >      }
> >
> >      struct engine_context eng_ctx = {
> > @@ -266,31 +270,31 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
> *ovnnb_txn,
> >          .ovnsb_idl_txn = ovnsb_txn,
> >      };
> >
> > -    engine_set_context(&eng_ctx);
> > +    engine_set_context(flow_engine, &eng_ctx);
> >
> >      if (ovnnb_txn && ovnsb_txn) {
> > -        engine_run(true);
> > +        engine_run(flow_engine, true);
> >      }
> >
> > -    if (!engine_has_run()) {
> > -        if (engine_need_run()) {
> > +    if (!engine_has_run(flow_engine)) {
> > +        if (engine_need_run(flow_engine)) {
> >              VLOG_DBG("engine did not run, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_set_force_recompute(flow_engine, true);
> >              poll_immediate_wake();
> >          } else {
> >              VLOG_DBG("engine did not run, and it was not needed");
> >          }
> > -    } else if (engine_aborted()) {
> > +    } else if (engine_aborted(flow_engine)) {
> >          VLOG_DBG("engine was aborted, force recompute next time.");
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >          poll_immediate_wake();
> >      } else {
> > -        engine_set_force_recompute(false);
> > +        engine_set_force_recompute(flow_engine, false);
> >      }
> >  }
> >
> >  void inc_proc_northd_cleanup(void)
> >  {
> > -    engine_cleanup();
> > -    engine_set_context(NULL);
> > +    engine_set_context(flow_engine, NULL);
> > +    engine_cleanup(flow_engine);
> >  }
> > --
> > 2.33.1
> >
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Han Zhou Dec. 20, 2021, 7:29 p.m. UTC | #3
On Mon, Dec 20, 2021 at 10:32 AM Lorenzo Bianconi <
lorenzo.bianconi@redhat.com> wrote:
>
> > On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> > lorenzo.bianconi@redhat.com> wrote:
> > >
> > > Remove global state variable and move move inc-proc code in an
isolated
> > > strucuture. This is a preliminary patch to add the capability to run
> > > multiple inc-proc engines.
> > >
> >
> > Thanks Lorenzo! Could you tell more about the use case when multiple
> > inc-proc engines are required?
>
> Hi Han,
>
> I will rely on this patch to add an incremental processing engine for ovn
> meters since in the current codebase ovs meters are updated when an ovn
meter
> is added or deleted but when it is updated.

Ok, thanks for the information. For meters, there are different types, ones
directly specified in lflows, and ones referencing SB meter table records.
Are you talking about the SB meter records handling? For either case, it
seems that we need to handle lflow together. Are we sure it is better to
have a separate engine than handling in the existing engine? What would be
the engine nodes and dependencies for the new engine?

Thanks,
Han

>
> Regards,
> Lorenzo
>
Lorenzo Bianconi Dec. 20, 2021, 7:50 p.m. UTC | #4
On Dec 20, Han Zhou wrote:
> On Mon, Dec 20, 2021 at 10:32 AM Lorenzo Bianconi <
> lorenzo.bianconi@redhat.com> wrote:
> >
> > > On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> > > lorenzo.bianconi@redhat.com> wrote:
> > > >
> > > > Remove global state variable and move move inc-proc code in an
> isolated
> > > > strucuture. This is a preliminary patch to add the capability to run
> > > > multiple inc-proc engines.
> > > >
> > >
> > > Thanks Lorenzo! Could you tell more about the use case when multiple
> > > inc-proc engines are required?
> >
> > Hi Han,
> >
> > I will rely on this patch to add an incremental processing engine for ovn
> > meters since in the current codebase ovs meters are updated when an ovn
> meter
> > is added or deleted but when it is updated.
> 
> Ok, thanks for the information. For meters, there are different types, ones
> directly specified in lflows, and ones referencing SB meter table records.
> Are you talking about the SB meter records handling? For either case, it
> seems that we need to handle lflow together. Are we sure it is better to
> have a separate engine than handling in the existing engine? What would be
> the engine nodes and dependencies for the new engine?

I guess this patch is orthogonal to the ovn-meter case, it would be useful
even for northd incremental processing started by Mark Gray. What do you think?

Regarding the ovn-meter use case I am referencing to the SB meter table but we
want to just update the meter bands w/o any change to the flow using the meter
(the lflow just refers to the meter UUID, not to the bands).
If we use a single engine we would be stopped by a force recompute before
processing the SB meter table, losing in this case the new band info.
Am I missing something?

Regards,
Lorenzo

> 
> Thanks,
> Han
> 
> >
> > Regards,
> > Lorenzo
> >
Han Zhou Dec. 20, 2021, 8:09 p.m. UTC | #5
On Mon, Dec 20, 2021 at 11:50 AM Lorenzo Bianconi <
lorenzo.bianconi@redhat.com> wrote:
>
> On Dec 20, Han Zhou wrote:
> > On Mon, Dec 20, 2021 at 10:32 AM Lorenzo Bianconi <
> > lorenzo.bianconi@redhat.com> wrote:
> > >
> > > > On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> > > > lorenzo.bianconi@redhat.com> wrote:
> > > > >
> > > > > Remove global state variable and move move inc-proc code in an
> > isolated
> > > > > strucuture. This is a preliminary patch to add the capability to
run
> > > > > multiple inc-proc engines.
> > > > >
> > > >
> > > > Thanks Lorenzo! Could you tell more about the use case when multiple
> > > > inc-proc engines are required?
> > >
> > > Hi Han,
> > >
> > > I will rely on this patch to add an incremental processing engine for
ovn
> > > meters since in the current codebase ovs meters are updated when an
ovn
> > meter
> > > is added or deleted but when it is updated.
> >
> > Ok, thanks for the information. For meters, there are different types,
ones
> > directly specified in lflows, and ones referencing SB meter table
records.
> > Are you talking about the SB meter records handling? For either case, it
> > seems that we need to handle lflow together. Are we sure it is better to
> > have a separate engine than handling in the existing engine? What would
be
> > the engine nodes and dependencies for the new engine?
>
> I guess this patch is orthogonal to the ovn-meter case, it would be useful
> even for northd incremental processing started by Mark Gray. What do you
think?

Maybe, but not necessary yet. I am not sure if northd would need multiple
inc-proc instances. In general, unless two engine instances have completely
separate nodes, there would be repeated processing for the overlapping
nodes when we use separate inc-proc engine instances.

>
> Regarding the ovn-meter use case I am referencing to the SB meter table
but we
> want to just update the meter bands w/o any change to the flow using the
meter
> (the lflow just refers to the meter UUID, not to the bands).

Ok, if it is just to reflect SB meter table change to the desired meter
table (so that it is installed to OVS), and has nothing to do with logical
flows, then why do we need inc-proc engine for this purpose? Would it be
straightforward to use a FOR_EACH_TRACKED loop to handle and update? The
inc-proc engine is required when there are multiple input dependencies.

> If we use a single engine we would be stopped by a force recompute before
> processing the SB meter table, losing in this case the new band info.

Sorry that I didn't understand this. If the dependency is added to the
proper place in I-P engine, it should not lose any update processing. But
perhaps I missed your point here. Would it be better to list the dependency
you are thinking about and then it is easier to discuss?

Thanks,
Han

> Am I missing something?
>
> Regards,
> Lorenzo
>
> >
> > Thanks,
> > Han
> >
> > >
> > > Regards,
> > > Lorenzo
> > >
Lorenzo Bianconi Dec. 21, 2021, 2:25 p.m. UTC | #6
> On Mon, Dec 20, 2021 at 11:50 AM Lorenzo Bianconi <
> lorenzo.bianconi@redhat.com> wrote:
> >
> > On Dec 20, Han Zhou wrote:
> > > On Mon, Dec 20, 2021 at 10:32 AM Lorenzo Bianconi <
> > > lorenzo.bianconi@redhat.com> wrote:
> > > >
> > > > > On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> > > > > lorenzo.bianconi@redhat.com> wrote:
> > > > > >
> > > > > > Remove global state variable and move move inc-proc code in an
> > > isolated
> > > > > > strucuture. This is a preliminary patch to add the capability to
> run
> > > > > > multiple inc-proc engines.
> > > > > >
> > > > >
> > > > > Thanks Lorenzo! Could you tell more about the use case when multiple
> > > > > inc-proc engines are required?
> > > >
> > > > Hi Han,
> > > >
> > > > I will rely on this patch to add an incremental processing engine for
> ovn
> > > > meters since in the current codebase ovs meters are updated when an
> ovn
> > > meter
> > > > is added or deleted but when it is updated.
> > >
> > > Ok, thanks for the information. For meters, there are different types,
> ones
> > > directly specified in lflows, and ones referencing SB meter table
> records.
> > > Are you talking about the SB meter records handling? For either case, it
> > > seems that we need to handle lflow together. Are we sure it is better to
> > > have a separate engine than handling in the existing engine? What would
> be
> > > the engine nodes and dependencies for the new engine?
> >
> > I guess this patch is orthogonal to the ovn-meter case, it would be useful
> > even for northd incremental processing started by Mark Gray. What do you
> think?
> 
> Maybe, but not necessary yet. I am not sure if northd would need multiple
> inc-proc instances. In general, unless two engine instances have completely
> separate nodes, there would be repeated processing for the overlapping
> nodes when we use separate inc-proc engine instances.

Since IP is managed as a library it would be nice to run it multiple times without
introduce any dependency (e.g. global variable or symbols).
Moreover, I think, removing the global symbols, the code would be even easier to
maintain and/or debug.

> 
> >
> > Regarding the ovn-meter use case I am referencing to the SB meter table
> but we
> > want to just update the meter bands w/o any change to the flow using the
> meter
> > (the lflow just refers to the meter UUID, not to the bands).
> 
> Ok, if it is just to reflect SB meter table change to the desired meter
> table (so that it is installed to OVS), and has nothing to do with logical
> flows, then why do we need inc-proc engine for this purpose? Would it be
> straightforward to use a FOR_EACH_TRACKED loop to handle and update? The
> inc-proc engine is required when there are multiple input dependencies.
> 
> > If we use a single engine we would be stopped by a force recompute before
> > processing the SB meter table, losing in this case the new band info.
> 
> Sorry that I didn't understand this. If the dependency is added to the
> proper place in I-P engine, it should not lose any update processing. But
> perhaps I missed your point here. Would it be better to list the dependency
> you are thinking about and then it is easier to discuss?

I think we can defer this discussion when I will post the series (I do not think
it is related to this patch). Agree?

Regards,
Lorenzo

> 
> Thanks,
> Han
> 
> > Am I missing something?
> >
> > Regards,
> > Lorenzo
> >
> > >
> > > Thanks,
> > > Han
> > >
> > > >
> > > > Regards,
> > > > Lorenzo
> > > >
Han Zhou Dec. 21, 2021, 6:25 p.m. UTC | #7
On Tue, Dec 21, 2021 at 6:25 AM Lorenzo Bianconi <
lorenzo.bianconi@redhat.com> wrote:
>
> > On Mon, Dec 20, 2021 at 11:50 AM Lorenzo Bianconi <
> > lorenzo.bianconi@redhat.com> wrote:
> > >
> > > On Dec 20, Han Zhou wrote:
> > > > On Mon, Dec 20, 2021 at 10:32 AM Lorenzo Bianconi <
> > > > lorenzo.bianconi@redhat.com> wrote:
> > > > >
> > > > > > On Mon, Dec 20, 2021 at 5:53 AM Lorenzo Bianconi <
> > > > > > lorenzo.bianconi@redhat.com> wrote:
> > > > > > >
> > > > > > > Remove global state variable and move move inc-proc code in an
> > > > isolated
> > > > > > > strucuture. This is a preliminary patch to add the capability
to
> > run
> > > > > > > multiple inc-proc engines.
> > > > > > >
> > > > > >
> > > > > > Thanks Lorenzo! Could you tell more about the use case when
multiple
> > > > > > inc-proc engines are required?
> > > > >
> > > > > Hi Han,
> > > > >
> > > > > I will rely on this patch to add an incremental processing engine
for
> > ovn
> > > > > meters since in the current codebase ovs meters are updated when
an
> > ovn
> > > > meter
> > > > > is added or deleted but when it is updated.
> > > >
> > > > Ok, thanks for the information. For meters, there are different
types,
> > ones
> > > > directly specified in lflows, and ones referencing SB meter table
> > records.
> > > > Are you talking about the SB meter records handling? For either
case, it
> > > > seems that we need to handle lflow together. Are we sure it is
better to
> > > > have a separate engine than handling in the existing engine? What
would
> > be
> > > > the engine nodes and dependencies for the new engine?
> > >
> > > I guess this patch is orthogonal to the ovn-meter case, it would be
useful
> > > even for northd incremental processing started by Mark Gray. What do
you
> > think?
> >
> > Maybe, but not necessary yet. I am not sure if northd would need
multiple
> > inc-proc instances. In general, unless two engine instances have
completely
> > separate nodes, there would be repeated processing for the overlapping
> > nodes when we use separate inc-proc engine instances.
>
> Since IP is managed as a library it would be nice to run it multiple
times without
> introduce any dependency (e.g. global variable or symbols).
> Moreover, I think, removing the global symbols, the code would be even
easier to
> maintain and/or debug.
>
I am open to this, but I would avoid changes if not necessary. The global
states are currently encapsulated in the I-P module, which I think is ok as
long as the engine is per process level. I didn't find it easier (or
harder) to debug by wrapping them in a structure. I would consider this
patch whenever multiple instances of the I-P engine are needed within a
program.

> >
> > >
> > > Regarding the ovn-meter use case I am referencing to the SB meter
table
> > but we
> > > want to just update the meter bands w/o any change to the flow using
the
> > meter
> > > (the lflow just refers to the meter UUID, not to the bands).
> >
> > Ok, if it is just to reflect SB meter table change to the desired meter
> > table (so that it is installed to OVS), and has nothing to do with
logical
> > flows, then why do we need inc-proc engine for this purpose? Would it be
> > straightforward to use a FOR_EACH_TRACKED loop to handle and update? The
> > inc-proc engine is required when there are multiple input dependencies.
> >
> > > If we use a single engine we would be stopped by a force recompute
before
> > > processing the SB meter table, losing in this case the new band info.
> >
> > Sorry that I didn't understand this. If the dependency is added to the
> > proper place in I-P engine, it should not lose any update processing.
But
> > perhaps I missed your point here. Would it be better to list the
dependency
> > you are thinking about and then it is easier to discuss?
>
> I think we can defer this discussion when I will post the series (I do
not think
> it is related to this patch). Agree?

Yes, of course. Since this patch is needed only if multiple instances of
I-P engine is used, I'd suggest reviewing it together with the series.
There were situations in the past where people put lots of effort on
something but received late feedback pointing out design problems wasting
lots of their time. I wanted to avoid that as much as possible (and I am
sorry if my feedback for this is already late). But if posting the rest of
patches saves time in this case, let's do it that way :)

Thanks,
Han

>
> Regards,
> Lorenzo
>
> >
> > Thanks,
> > Han
> >
> > > Am I missing something?
> > >
> > > Regards,
> > > Lorenzo
> > >
> > > >
> > > > Thanks,
> > > > Han
> > > >
> > > > >
> > > > > Regards,
> > > > > Lorenzo
> > > > >
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 5069aedfc..86cb6f769 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -114,6 +114,8 @@  static unixctl_cb_func debug_delay_nb_cfg_report;
 #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts"
 #define OVS_STARTUP_TS_NAME "ovn-startup-ts"
 
+static struct engine *flow_engine;
+
 static char *parse_options(int argc, char *argv[]);
 OVS_NO_RETURN static void usage(void);
 
@@ -557,7 +559,7 @@  update_sb_db(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
     }
     if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
         VLOG_INFO("Resetting southbound database cluster state");
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
         ovsdb_idl_reset_min_index(ovnsb_idl);
         *reset_ovnsb_idl_min_index = false;
     }
@@ -1011,7 +1013,8 @@  en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED)
 static void
 en_ofctrl_is_connected_run(struct engine_node *node, void *data)
 {
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
     struct ed_type_ofctrl_is_connected *of_data = data;
     if (of_data->connected != ofctrl_is_connected()) {
         of_data->connected = !of_data->connected;
@@ -1226,10 +1229,11 @@  init_binding_ctx(struct engine_node *node,
                 engine_get_input("SB_port_binding", node),
                 "datapath");
 
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
 
-    b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn;
-    b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn;
+    b_ctx_in->ovnsb_idl_txn = engine_get_context(flow_engine)->ovnsb_idl_txn;
+    b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn;
     b_ctx_in->sbrec_datapath_binding_by_key = sbrec_datapath_binding_by_key;
     b_ctx_in->sbrec_port_binding_by_datapath = sbrec_port_binding_by_datapath;
     b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
@@ -2387,7 +2391,8 @@  en_lflow_output_run(struct engine_node *node, void *data)
         lflow_conj_ids_clear(&fo->conj_ids);
     }
 
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
 
     fo->pd.lflow_cache = ctrl_ctx->lflow_cache;
 
@@ -3040,7 +3045,7 @@  check_northd_version(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
      * full recompute.
      */
     if (version_mismatch) {
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
     }
     version_mismatch = false;
     return true;
@@ -3206,6 +3211,8 @@  main(int argc, char *argv[])
     stopwatch_create(BFD_RUN_STOPWATCH_NAME, SW_MS);
     stopwatch_create(VIF_PLUG_RUN_STOPWATCH_NAME, SW_MS);
 
+    engine_init_global();
+
     /* Define inc-proc-engine nodes. */
     ENGINE_NODE_WITH_CLEAR_TRACK_DATA_IS_VALID(ct_zones, "ct_zones");
     ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
@@ -3344,7 +3351,7 @@  main(int argc, char *argv[])
         .sb_idl = ovnsb_idl_loop.idl,
         .ovs_idl = ovs_idl_loop.idl,
     };
-    engine_init(&en_flow_output, &engine_arg);
+    flow_engine = engine_new(&en_flow_output, &engine_arg, "flow_engine");
 
     engine_ovsdb_node_add_index(&en_sb_chassis, "name", sbrec_chassis_by_name);
     engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
@@ -3396,7 +3403,7 @@  main(int argc, char *argv[])
 
     unixctl_command_register("recompute", "[deprecated]", 0, 0,
                              engine_recompute_cmd,
-                             NULL);
+                             flow_engine);
     unixctl_command_register("lflow-cache/flush", "", 0, 0,
                              lflow_cache_flush_cmd,
                              &lflow_output_data->pd);
@@ -3480,7 +3487,7 @@  main(int argc, char *argv[])
             goto loop_done;
         }
 
-        engine_init_run();
+        engine_init_run(flow_engine);
 
         struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
         unsigned int new_ovs_cond_seqno
@@ -3488,7 +3495,7 @@  main(int argc, char *argv[])
         if (new_ovs_cond_seqno != ovs_cond_seqno) {
             if (!new_ovs_cond_seqno) {
                 VLOG_INFO("OVS IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
             }
             ovs_cond_seqno = new_ovs_cond_seqno;
         }
@@ -3506,7 +3513,7 @@  main(int argc, char *argv[])
         if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
             if (!new_ovnsb_cond_seqno) {
                 VLOG_INFO("OVNSB IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
                 vif_plug_reset_idl_prime_counter();
             }
             ovnsb_cond_seqno = new_ovnsb_cond_seqno;
@@ -3518,7 +3525,7 @@  main(int argc, char *argv[])
             .client_ctx = &ctrl_engine_ctx
         };
 
-        engine_set_context(&eng_ctx);
+        engine_set_context(flow_engine, &eng_ctx);
 
         bool northd_version_match =
             check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl,
@@ -3584,7 +3591,7 @@  main(int argc, char *argv[])
                                            &br_int_dp->capabilities : NULL,
                                            br_int ? br_int->name : NULL)) {
                 VLOG_INFO("OVS feature set changed, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
             }
 
             if (br_int) {
@@ -3619,9 +3626,9 @@  main(int argc, char *argv[])
                              * this round of engine_run and continue processing
                              * acculated changes incrementally later when
                              * ofctrl_can_put() returns true. */
-                            engine_run(false);
+                            engine_run(flow_engine, false);
                         } else {
-                            engine_run(true);
+                            engine_run(flow_engine, true);
                         }
                     } else {
                         /* Even if there's no SB DB transaction available,
@@ -3630,7 +3637,7 @@  main(int argc, char *argv[])
                          * If a recompute is required, the engine will abort,
                          * triggerring a full run in the next iteration.
                          */
-                        engine_run(false);
+                        engine_run(flow_engine, false);
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
                                    time_msec());
@@ -3775,24 +3782,24 @@  main(int argc, char *argv[])
 
             }
 
-            if (!engine_has_run()) {
-                if (engine_need_run()) {
+            if (!engine_has_run(flow_engine)) {
+                if (engine_need_run(flow_engine)) {
                     VLOG_DBG("engine did not run, force recompute next time: "
                              "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
+                    engine_set_force_recompute(flow_engine, true);
                     poll_immediate_wake();
                 } else {
                     VLOG_DBG("engine did not run, and it was not needed"
                              " either: br_int %p, chassis %p",
                              br_int, chassis);
                 }
-            } else if (engine_aborted()) {
+            } else if (engine_aborted(flow_engine)) {
                 VLOG_DBG("engine was aborted, force recompute next time: "
                          "br_int %p, chassis %p", br_int, chassis);
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
                 poll_immediate_wake();
             } else {
-                engine_set_force_recompute(false);
+                engine_set_force_recompute(flow_engine, false);
             }
 
             store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
@@ -3846,7 +3853,7 @@  main(int argc, char *argv[])
 
         if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
             VLOG_INFO("OVNSB commit failed, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_set_force_recompute(flow_engine, true);
         }
 
         int ovs_txn_status = ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
@@ -3896,8 +3903,8 @@  loop_done:
         }
     }
 
-    engine_set_context(NULL);
-    engine_cleanup();
+    engine_set_context(flow_engine, NULL);
+    engine_cleanup(flow_engine);
 
     /* It's time to exit.  Clean up the databases if we are not restarting */
     if (!restart) {
@@ -4152,9 +4159,9 @@  inject_pkt(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
 static void
 engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc OVS_UNUSED,
-                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
+                     const char *argv[] OVS_UNUSED, void *arg)
 {
-    engine_trigger_recompute();
+    engine_trigger_recompute(arg);
     unixctl_command_reply(conn, NULL);
 }
 
@@ -4166,7 +4173,7 @@  lflow_cache_flush_cmd(struct unixctl_conn *conn OVS_UNUSED,
     VLOG_INFO("User triggered lflow cache flush.");
     struct lflow_output_persistent_data *fo_pd = arg_;
     lflow_cache_flush(fo_pd->lflow_cache);
-    engine_set_force_recompute(true);
+    engine_set_force_recompute(flow_engine, true);
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 2958a55e3..9468b4ce0 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -33,12 +33,7 @@ 
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
 
-static bool engine_force_recompute = false;
-static bool engine_run_aborted = false;
-static const struct engine_context *engine_context;
-
-static struct engine_node **engine_nodes;
-static size_t engine_n_nodes;
+static struct ovs_list engines = OVS_LIST_INITIALIZER(&engines);
 
 static const char *engine_node_state_name[EN_STATE_MAX] = {
     [EN_STALE]     = "Stale",
@@ -52,21 +47,21 @@  engine_recompute(struct engine_node *node, bool allowed,
                  const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
 
 void
-engine_set_force_recompute(bool val)
+engine_set_force_recompute(struct engine *e, bool val)
 {
-    engine_force_recompute = val;
+    e->engine_force_recompute = val;
 }
 
 const struct engine_context *
-engine_get_context(void)
+engine_get_context(struct engine *e)
 {
-    return engine_context;
+    return e->engine_context;
 }
 
 void
-engine_set_context(const struct engine_context *ctx)
+engine_set_context(struct engine *e, const struct engine_context *ctx)
 {
-    engine_context = ctx;
+    e->engine_context = ctx;
 }
 
 /* Builds the topologically sorted 'sorted_nodes' array starting from
@@ -113,30 +108,53 @@  static void
 engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
                    const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        struct engine_node *node = engine_nodes[i];
+    const char *target = argc == 2 ? argv[1] : NULL;
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct engine *e;
+
+    ds_put_format(&reply, "no %s engine found", target ? target : "");
+    LIST_FOR_EACH (e, node, &engines) {
+        for (size_t i = 0; i < e->engine_n_nodes; i++) {
+            struct engine_node *node = e->engine_nodes[i];
 
-        memset(&node->stats, 0, sizeof node->stats);
+            if (target && strcmp(target, e->name)) {
+                continue;
+            }
+            memset(&node->stats, 0, sizeof node->stats);
+            ds_clear(&reply);
+        }
     }
-    unixctl_command_reply(conn, NULL);
+
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
 }
 
 static void
-engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
+engine_dump_stats(struct unixctl_conn *conn, int argc,
                   const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
 {
+    const char *target = argc == 2 ? argv[1] : NULL;
     struct ds dump = DS_EMPTY_INITIALIZER;
+    struct engine *e;
 
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        struct engine_node *node = engine_nodes[i];
+    LIST_FOR_EACH (e, node, &engines) {
+        for (size_t i = 0; i < e->engine_n_nodes; i++) {
+            struct engine_node *node = e->engine_nodes[i];
 
-        ds_put_format(&dump,
-                      "Node: %s\n"
-                      "- recompute: %12"PRIu64"\n"
-                      "- compute:   %12"PRIu64"\n"
-                      "- abort:     %12"PRIu64"\n",
-                      node->name, node->stats.recompute,
-                      node->stats.compute, node->stats.abort);
+            if (target && strcmp(target, e->name)) {
+                continue;
+            }
+            ds_put_format(&dump,
+                          "Node: %s\n"
+                          "- recompute: %12"PRIu64"\n"
+                          "- compute:   %12"PRIu64"\n"
+                          "- abort:     %12"PRIu64"\n",
+                          node->name, node->stats.recompute,
+                          node->stats.compute, node->stats.abort);
+        }
+    }
+    if (ds_last(&dump) == EOF) {
+        ds_put_format(&dump, "no %s engine found", target ? target : "");
     }
     unixctl_command_reply(conn, ds_cstr(&dump));
 
@@ -148,48 +166,91 @@  engine_trigger_recompute_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
                              const char *argv[] OVS_UNUSED,
                              void *arg OVS_UNUSED)
 {
-    engine_trigger_recompute();
-    unixctl_command_reply(conn, NULL);
+    const char *target = argc == 2 ? argv[1] : NULL;
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct engine *e;
+
+    ds_put_format(&reply, "no %s engine found", target ? target : "");
+    LIST_FOR_EACH (e, node, &engines) {
+        if (target && strcmp(target, e->name)) {
+            continue;
+        }
+        engine_trigger_recompute(e);
+        ds_clear(&reply);
+    }
+
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
 }
 
-void
-engine_init(struct engine_node *node, struct engine_arg *arg)
+static void
+engine_list_engines(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                    const char *argv[] OVS_UNUSED,
+                    void *arg OVS_UNUSED)
 {
-    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct engine *e;
 
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->init) {
-            engine_nodes[i]->data =
-                engine_nodes[i]->init(engine_nodes[i], arg);
-        } else {
-            engine_nodes[i]->data = NULL;
-        }
+    LIST_FOR_EACH (e, node, &engines) {
+            ds_put_format(&reply, "%s\n", e->name);
     }
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
 
-    unixctl_command_register("inc-engine/show-stats", "", 0, 0,
+void
+engine_init_global(void)
+{
+    unixctl_command_register("inc-engine/show-stats", "[engine]", 0, 1,
                              engine_dump_stats, NULL);
-    unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
+    unixctl_command_register("inc-engine/clear-stats", "[engine]", 0, 1,
                              engine_clear_stats, NULL);
-    unixctl_command_register("inc-engine/recompute", "", 0, 0,
+    unixctl_command_register("inc-engine/recompute", "[engine]", 0, 1,
                              engine_trigger_recompute_cmd, NULL);
+    unixctl_command_register("inc-engine/list-engines", "", 0, 0,
+                             engine_list_engines, NULL);
+}
+
+struct engine *
+engine_new(struct engine_node *node, struct engine_arg *arg,
+           const char *name)
+{
+    struct engine *e = xzalloc(sizeof *e);
+
+    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
+    e->name = name;
+
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->init) {
+            e->engine_nodes[i]->data =
+                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
+        } else {
+            e->engine_nodes[i]->data = NULL;
+        }
+        e->engine_nodes[i]->e = e;
+    }
+
+    ovs_list_push_back(&engines, &e->node);
+
+    return e;
 }
 
 void
-engine_cleanup(void)
+engine_cleanup(struct engine *e)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->clear_tracked_data) {
-            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->clear_tracked_data) {
+            e->engine_nodes[i]->clear_tracked_data(
+                    e->engine_nodes[i]->data);
         }
 
-        if (engine_nodes[i]->cleanup) {
-            engine_nodes[i]->cleanup(engine_nodes[i]->data);
+        if (e->engine_nodes[i]->cleanup) {
+            e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data);
         }
-        free(engine_nodes[i]->data);
+        free(e->engine_nodes[i]->data);
     }
-    free(engine_nodes);
-    engine_nodes = NULL;
-    engine_n_nodes = 0;
+    ovs_list_remove(&e->node);
+    free(e->engine_nodes);
 }
 
 struct engine_node *
@@ -284,10 +345,10 @@  engine_node_changed(struct engine_node *node)
 }
 
 bool
-engine_has_run(void)
+engine_has_run(struct engine *e)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->state != EN_STALE) {
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->state != EN_STALE) {
             return true;
         }
     }
@@ -295,9 +356,9 @@  engine_has_run(void)
 }
 
 bool
-engine_aborted(void)
+engine_aborted(struct engine *e)
 {
-    return engine_run_aborted;
+    return e->engine_run_aborted;
 }
 
 void *
@@ -316,14 +377,15 @@  engine_get_internal_data(struct engine_node *node)
 }
 
 void
-engine_init_run(void)
+engine_init_run(struct engine *e)
 {
     VLOG_DBG("Initializing new run");
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        engine_set_node_state(engine_nodes[i], EN_STALE);
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        engine_set_node_state(e->engine_nodes[i], EN_STALE);
 
-        if (engine_nodes[i]->clear_tracked_data) {
-            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
+        if (e->engine_nodes[i]->clear_tracked_data) {
+            e->engine_nodes[i]->clear_tracked_data(
+                    e->engine_nodes[i]->data);
         }
     }
 }
@@ -397,7 +459,8 @@  engine_compute(struct engine_node *node, bool recompute_allowed)
 }
 
 static void
-engine_run_node(struct engine_node *node, bool recompute_allowed)
+engine_run_node(struct engine *e, struct engine_node *node,
+                bool recompute_allowed)
 {
     if (!node->n_inputs) {
         /* Run the node handler which might change state. */
@@ -406,7 +469,7 @@  engine_run_node(struct engine_node *node, bool recompute_allowed)
         return;
     }
 
-    if (engine_force_recompute) {
+    if (e->engine_force_recompute) {
         engine_recompute(node, recompute_allowed, "forced");
         return;
     }
@@ -447,41 +510,41 @@  engine_run_node(struct engine_node *node, bool recompute_allowed)
 }
 
 void
-engine_run(bool recompute_allowed)
+engine_run(struct engine *e, bool recompute_allowed)
 {
     /* If the last run was aborted skip the incremental run because a
      * recompute is needed first.
      */
-    if (!recompute_allowed && engine_run_aborted) {
+    if (!recompute_allowed && e->engine_run_aborted) {
         return;
     }
 
-    engine_run_aborted = false;
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        engine_run_node(engine_nodes[i], recompute_allowed);
+    e->engine_run_aborted = false;
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        engine_run_node(e, e->engine_nodes[i], recompute_allowed);
 
-        if (engine_nodes[i]->state == EN_ABORTED) {
-            engine_nodes[i]->stats.abort++;
-            engine_run_aborted = true;
+        if (e->engine_nodes[i]->state == EN_ABORTED) {
+            e->engine_nodes[i]->stats.abort++;
+            e->engine_run_aborted = true;
             return;
         }
     }
 }
 
 bool
-engine_need_run(void)
+engine_need_run(struct engine *e)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
         /* Check only leaf nodes for updates. */
-        if (engine_nodes[i]->n_inputs) {
+        if (e->engine_nodes[i]->n_inputs) {
             continue;
         }
 
-        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
-        engine_nodes[i]->stats.recompute++;
-        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
-                 engine_node_state_name[engine_nodes[i]->state]);
-        if (engine_nodes[i]->state == EN_UPDATED) {
+        e->engine_nodes[i]->run(e->engine_nodes[i], e->engine_nodes[i]->data);
+        e->engine_nodes[i]->stats.recompute++;
+        VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name,
+                 engine_node_state_name[e->engine_nodes[i]->state]);
+        if (e->engine_nodes[i]->state == EN_UPDATED) {
             return true;
         }
     }
@@ -489,9 +552,9 @@  engine_need_run(void)
 }
 
 void
-engine_trigger_recompute(void)
+engine_trigger_recompute(struct engine *e)
 {
     VLOG_INFO("User triggered force recompute.");
-    engine_set_force_recompute(true);
+    engine_set_force_recompute(e, true);
     poll_immediate_wake();
 }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 9bfab1f7c..881242138 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -67,6 +67,7 @@ 
 #include <stdint.h>
 
 #include "compiler.h"
+#include "openvswitch/list.h"
 
 struct engine_context {
     struct ovsdb_idl_txn *ovs_idl_txn;
@@ -122,6 +123,8 @@  struct engine_stats {
 };
 
 struct engine_node {
+    struct engine *e;
+
     /* A unique name for each node. */
     char *name;
 
@@ -173,30 +176,47 @@  struct engine_node {
     struct engine_stats stats;
 };
 
+struct engine {
+    struct ovs_list node;
+
+    const char *name;
+
+    struct engine_node **engine_nodes;
+    size_t engine_n_nodes;
+
+    bool engine_force_recompute;
+    bool engine_run_aborted;
+
+    const struct engine_context *engine_context;
+};
+
+void engine_init_global(void);
+
 /* Initialize the data for the engine nodes. It calls each node's
  * init() method if not NULL passing the user supplied 'arg'.
  * It should be called before the main loop. */
-void engine_init(struct engine_node *node, struct engine_arg *arg);
+struct engine *engine_new(struct engine_node *node, struct engine_arg *arg,
+                          const char *name);
 
 /* Initialize the engine nodes for a new run. It should be called in the
  * main processing loop before every potential engine_run().
  */
-void engine_init_run(void);
+void engine_init_run(struct engine *e);
 
 /* Execute the processing, which should be called in the main loop.
  * Updates the engine node's states accordingly. If 'recompute_allowed' is
  * false and a recompute is required by the current engine run then the engine
  * aborts.
  */
-void engine_run(bool recompute_allowed);
+void engine_run(struct engine *e, bool recompute_allowed);
 
 /* Clean up the data for the engine nodes. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
-void engine_cleanup(void);
+void engine_cleanup(struct engine *e);
 
 /* Check if engine needs to run but didn't. */
-bool engine_need_run(void);
+bool engine_need_run(struct engine *e);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -216,7 +236,7 @@  void engine_add_input(struct engine_node *node, struct engine_node *input,
  * in circumstances when we are not sure there is change or not, or
  * when there is change but the engine couldn't be executed in that
  * iteration, and the change can't be tracked across iterations */
-void engine_set_force_recompute(bool val);
+void engine_set_force_recompute(struct engine *e, bool val);
 
 /* Return the current engine_context. The values in the context can be NULL
  * if the engine is run with allow_recompute == false in the current
@@ -224,9 +244,9 @@  void engine_set_force_recompute(bool val);
  * Therefore, it is the responsibility of the caller to check the context
  * values when called from change handlers.
  */
-const struct engine_context *engine_get_context(void);
+const struct engine_context *engine_get_context(struct engine *e);
 
-void engine_set_context(const struct engine_context *);
+void engine_set_context(struct engine *e, const struct engine_context *);
 
 void engine_set_node_state_at(struct engine_node *node,
                               enum engine_node_state state,
@@ -236,10 +256,10 @@  void engine_set_node_state_at(struct engine_node *node,
 bool engine_node_changed(struct engine_node *node);
 
 /* Return true if the engine has run in the last iteration. */
-bool engine_has_run(void);
+bool engine_has_run(struct engine *e);
 
 /* Returns true if during the last engine run we had to abort processing. */
-bool engine_aborted(void);
+bool engine_aborted(struct engine *e);
 
 /* Return a pointer to node data accessible for users outside the processing
  * engine. If the node data is not valid (e.g., last engine_run() failed or
@@ -265,7 +285,7 @@  void *engine_get_internal_data(struct engine_node *node);
     engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
 
 /* Trigger a full recompute. */
-void engine_trigger_recompute(void);
+void engine_trigger_recompute(struct engine *e);
 
 struct ed_ovsdb_index {
     const char *name;
diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index ffbdaf4e8..5451e0551 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -32,7 +32,7 @@  VLOG_DEFINE_THIS_MODULE(en_lflow);
 
 void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED)
 {
-    const struct engine_context *eng_ctx = engine_get_context();
+    const struct engine_context *eng_ctx = engine_get_context(node->e);
 
     struct lflow_input lflow_input;
 
diff --git a/northd/en-northd.c b/northd/en-northd.c
index 79da7e1c4..064f9d93a 100644
--- a/northd/en-northd.c
+++ b/northd/en-northd.c
@@ -32,7 +32,7 @@  VLOG_DEFINE_THIS_MODULE(en_northd);
 
 void en_northd_run(struct engine_node *node, void *data)
 {
-    const struct engine_context *eng_ctx = engine_get_context();
+    const struct engine_context *eng_ctx = engine_get_context(node->e);
 
     struct northd_input input_data;
 
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index af55221e3..049fe226a 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -33,6 +33,8 @@ 
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_northd);
 
+static struct engine *flow_engine;
+
 #define NB_NODES \
     NB_NODE(nb_global, "nb_global") \
     NB_NODE(copp, "copp") \
@@ -150,6 +152,8 @@  static ENGINE_NODE(lflow, "lflow");
 void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                           struct ovsdb_idl_loop *sb)
 {
+    engine_init_global();
+
     /* Define relationships between nodes where first argument is dependent
      * on the second argument */
     engine_add_input(&en_northd, &en_nb_nb_global, NULL);
@@ -229,7 +233,7 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     struct ovsdb_idl_index *sbrec_chassis_by_hostname =
         chassis_hostname_index_create(sb->idl);
 
-    engine_init(&en_lflow, &engine_arg);
+    flow_engine = engine_new(&en_lflow, &engine_arg, "flow_engine");
 
     engine_ovsdb_node_add_index(&en_sb_chassis,
                                 "sbrec_chassis_by_name",
@@ -251,14 +255,14 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
 void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
                          struct ovsdb_idl_txn *ovnsb_txn,
                          bool recompute) {
-    engine_init_run();
+    engine_init_run(flow_engine);
 
     /* Force a full recompute if instructed to, for example, after a NB/SB
      * reconnect event.  However, make sure we don't overwrite an existing
      * force-recompute request if 'recompute' is false.
      */
     if (recompute) {
-        engine_set_force_recompute(recompute);
+        engine_set_force_recompute(flow_engine, recompute);
     }
 
     struct engine_context eng_ctx = {
@@ -266,31 +270,31 @@  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
         .ovnsb_idl_txn = ovnsb_txn,
     };
 
-    engine_set_context(&eng_ctx);
+    engine_set_context(flow_engine, &eng_ctx);
 
     if (ovnnb_txn && ovnsb_txn) {
-        engine_run(true);
+        engine_run(flow_engine, true);
     }
 
-    if (!engine_has_run()) {
-        if (engine_need_run()) {
+    if (!engine_has_run(flow_engine)) {
+        if (engine_need_run(flow_engine)) {
             VLOG_DBG("engine did not run, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_set_force_recompute(flow_engine, true);
             poll_immediate_wake();
         } else {
             VLOG_DBG("engine did not run, and it was not needed");
         }
-    } else if (engine_aborted()) {
+    } else if (engine_aborted(flow_engine)) {
         VLOG_DBG("engine was aborted, force recompute next time.");
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
         poll_immediate_wake();
     } else {
-        engine_set_force_recompute(false);
+        engine_set_force_recompute(flow_engine, false);
     }
 }
 
 void inc_proc_northd_cleanup(void)
 {
-    engine_cleanup();
-    engine_set_context(NULL);
+    engine_set_context(flow_engine, NULL);
+    engine_cleanup(flow_engine);
 }