diff mbox series

[ovs-dev] inc-proc-eng: move inc-proc code in an isolated strucuture

Message ID 64909d26651cefb3e1a5d33c4908422c57aaac76.1638556405.git.lorenzo.bianconi@redhat.com
State Changes Requested
Headers show
Series [ovs-dev] inc-proc-eng: move inc-proc code in an isolated strucuture | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Lorenzo Bianconi Dec. 3, 2021, 6:53 p.m. UTC
Remove global state variables and move move inc-proc code in an isolated
structure. This is a preliminary patch to add the capability to run
multiple inc-proc engines.

Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
---
 controller/ovn-controller.c |  65 +++++++++-------
 lib/inc-proc-eng.c          | 151 +++++++++++++++++++-----------------
 lib/inc-proc-eng.h          |  35 ++++++---
 northd/en-lflow.c           |   2 +-
 northd/en-northd.c          |   2 +-
 northd/inc-proc-northd.c    |  28 +++----
 6 files changed, 155 insertions(+), 128 deletions(-)

Comments

Dumitru Ceara Dec. 8, 2021, 1:50 p.m. UTC | #1
Hi Lorenzo,

Typo in the subject: "strucuture".

On 12/3/21 19:53, Lorenzo Bianconi wrote:
> Remove global state variables and move move inc-proc code in an isolated
> structure. This is a preliminary patch to add the capability to run
> multiple inc-proc engines.

Overall this looks OK to me; there is however an issue with the unixctl
commands, please see below.

> 
> Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
> ---
>  controller/ovn-controller.c |  65 +++++++++-------
>  lib/inc-proc-eng.c          | 151 +++++++++++++++++++-----------------
>  lib/inc-proc-eng.h          |  35 ++++++---
>  northd/en-lflow.c           |   2 +-
>  northd/en-northd.c          |   2 +-
>  northd/inc-proc-northd.c    |  28 +++----
>  6 files changed, 155 insertions(+), 128 deletions(-)
> 

[...]

> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 2958a55e3..86d2df520 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c

[...]

> @@ -145,51 +141,58 @@ engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
>  
>  static void
>  engine_trigger_recompute_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
> -                             const char *argv[] OVS_UNUSED,
> -                             void *arg OVS_UNUSED)
> +                             const char *argv[] OVS_UNUSED, void *arg)
>  {
> -    engine_trigger_recompute();
> +    struct engine *e = arg;
> +
> +    engine_trigger_recompute(e);
>      unixctl_command_reply(conn, NULL);
>  }
>  
> -void
> -engine_init(struct engine_node *node, struct engine_arg *arg)
> +void engine_init(struct engine **pe, struct engine_node *node,
> +                 struct engine_arg *arg)
>  {
> -    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> +    struct engine *e = xzalloc(sizeof *e);
> +
> +    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
>  
> -    for (size_t i = 0; i < engine_n_nodes; i++) {
> -        if (engine_nodes[i]->init) {
> -            engine_nodes[i]->data =
> -                engine_nodes[i]->init(engine_nodes[i], arg);
> +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> +        if (e->engine_nodes[i]->init) {
> +            e->engine_nodes[i]->data =
> +                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
>          } else {
> -            engine_nodes[i]->data = NULL;
> +            e->engine_nodes[i]->data = NULL;
>          }
> +        e->engine_nodes[i]->e = e;
>      }
>  
>      unixctl_command_register("inc-engine/show-stats", "", 0, 0,
> -                             engine_dump_stats, NULL);
> +                             engine_dump_stats, e);
>      unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
> -                             engine_clear_stats, NULL);
> +                             engine_clear_stats, e);
>      unixctl_command_register("inc-engine/recompute", "", 0, 0,
> -                             engine_trigger_recompute_cmd, NULL);
> +                             engine_trigger_recompute_cmd, e);

This won't work as expected if there are multiple incremental processing
engines.  The commands will only be registered for the first one.  We
need to find a different way to do this.

Regards,
Dumitru
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 26593bc0d..124877c23 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -113,6 +113,8 @@  static unixctl_cb_func debug_delay_nb_cfg_report;
 #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts"
 #define OVS_STARTUP_TS_NAME "ovn-startup-ts"
 
+static struct engine *flow_engine;
+
 static char *parse_options(int argc, char *argv[]);
 OVS_NO_RETURN static void usage(void);
 
@@ -556,7 +558,7 @@  update_sb_db(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
     }
     if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
         VLOG_INFO("Resetting southbound database cluster state");
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
         ovsdb_idl_reset_min_index(ovnsb_idl);
         *reset_ovnsb_idl_min_index = false;
     }
@@ -1010,7 +1012,8 @@  en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED)
 static void
 en_ofctrl_is_connected_run(struct engine_node *node, void *data)
 {
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
     struct ed_type_ofctrl_is_connected *of_data = data;
     if (of_data->connected != ofctrl_is_connected()) {
         of_data->connected = !of_data->connected;
@@ -1225,10 +1228,11 @@  init_binding_ctx(struct engine_node *node,
                 engine_get_input("SB_port_binding", node),
                 "datapath");
 
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
 
-    b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn;
-    b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn;
+    b_ctx_in->ovnsb_idl_txn = engine_get_context(flow_engine)->ovnsb_idl_txn;
+    b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn;
     b_ctx_in->sbrec_datapath_binding_by_key = sbrec_datapath_binding_by_key;
     b_ctx_in->sbrec_port_binding_by_datapath = sbrec_port_binding_by_datapath;
     b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
@@ -2386,7 +2390,8 @@  en_lflow_output_run(struct engine_node *node, void *data)
         lflow_conj_ids_clear(&fo->conj_ids);
     }
 
-    struct controller_engine_ctx *ctrl_ctx = engine_get_context()->client_ctx;
+    struct controller_engine_ctx *ctrl_ctx =
+        engine_get_context(flow_engine)->client_ctx;
 
     fo->pd.lflow_cache = ctrl_ctx->lflow_cache;
 
@@ -3039,7 +3044,7 @@  check_northd_version(struct ovsdb_idl *ovs_idl, struct ovsdb_idl *ovnsb_idl,
      * full recompute.
      */
     if (version_mismatch) {
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
     }
     version_mismatch = false;
     return true;
@@ -3343,7 +3348,7 @@  main(int argc, char *argv[])
         .sb_idl = ovnsb_idl_loop.idl,
         .ovs_idl = ovs_idl_loop.idl,
     };
-    engine_init(&en_flow_output, &engine_arg);
+    engine_init(&flow_engine, &en_flow_output, &engine_arg);
 
     engine_ovsdb_node_add_index(&en_sb_chassis, "name", sbrec_chassis_by_name);
     engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
@@ -3395,7 +3400,7 @@  main(int argc, char *argv[])
 
     unixctl_command_register("recompute", "[deprecated]", 0, 0,
                              engine_recompute_cmd,
-                             NULL);
+                             flow_engine);
     unixctl_command_register("lflow-cache/flush", "", 0, 0,
                              lflow_cache_flush_cmd,
                              &lflow_output_data->pd);
@@ -3475,7 +3480,7 @@  main(int argc, char *argv[])
             goto loop_done;
         }
 
-        engine_init_run();
+        engine_init_run(flow_engine);
 
         struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
         unsigned int new_ovs_cond_seqno
@@ -3483,7 +3488,7 @@  main(int argc, char *argv[])
         if (new_ovs_cond_seqno != ovs_cond_seqno) {
             if (!new_ovs_cond_seqno) {
                 VLOG_INFO("OVS IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
             }
             ovs_cond_seqno = new_ovs_cond_seqno;
         }
@@ -3501,7 +3506,7 @@  main(int argc, char *argv[])
         if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
             if (!new_ovnsb_cond_seqno) {
                 VLOG_INFO("OVNSB IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
                 vif_plug_reset_idl_prime_counter();
             }
             ovnsb_cond_seqno = new_ovnsb_cond_seqno;
@@ -3513,7 +3518,7 @@  main(int argc, char *argv[])
             .client_ctx = &ctrl_engine_ctx
         };
 
-        engine_set_context(&eng_ctx);
+        engine_set_context(flow_engine, &eng_ctx);
 
         bool northd_version_match =
             check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl,
@@ -3579,7 +3584,7 @@  main(int argc, char *argv[])
                                            &br_int_dp->capabilities : NULL,
                                            br_int ? br_int->name : NULL)) {
                 VLOG_INFO("OVS feature set changed, force recompute.");
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
             }
 
             if (br_int) {
@@ -3614,9 +3619,9 @@  main(int argc, char *argv[])
                              * this round of engine_run and continue processing
                              * acculated changes incrementally later when
                              * ofctrl_can_put() returns true. */
-                            engine_run(false);
+                            engine_run(flow_engine, false);
                         } else {
-                            engine_run(true);
+                            engine_run(flow_engine, true);
                         }
                     } else {
                         /* Even if there's no SB DB transaction available,
@@ -3625,7 +3630,7 @@  main(int argc, char *argv[])
                          * If a recompute is required, the engine will abort,
                          * triggerring a full run in the next iteration.
                          */
-                        engine_run(false);
+                        engine_run(flow_engine, false);
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
                                    time_msec());
@@ -3770,24 +3775,24 @@  main(int argc, char *argv[])
 
             }
 
-            if (!engine_has_run()) {
-                if (engine_need_run()) {
+            if (!engine_has_run(flow_engine)) {
+                if (engine_need_run(flow_engine)) {
                     VLOG_DBG("engine did not run, force recompute next time: "
                              "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
+                    engine_set_force_recompute(flow_engine, true);
                     poll_immediate_wake();
                 } else {
                     VLOG_DBG("engine did not run, and it was not needed"
                              " either: br_int %p, chassis %p",
                              br_int, chassis);
                 }
-            } else if (engine_aborted()) {
+            } else if (engine_aborted(flow_engine)) {
                 VLOG_DBG("engine was aborted, force recompute next time: "
                          "br_int %p, chassis %p", br_int, chassis);
-                engine_set_force_recompute(true);
+                engine_set_force_recompute(flow_engine, true);
                 poll_immediate_wake();
             } else {
-                engine_set_force_recompute(false);
+                engine_set_force_recompute(flow_engine, false);
             }
 
             store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
@@ -3841,7 +3846,7 @@  main(int argc, char *argv[])
 
         if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
             VLOG_INFO("OVNSB commit failed, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_set_force_recompute(flow_engine, true);
         }
 
         int ovs_txn_status = ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
@@ -3891,8 +3896,8 @@  loop_done:
         }
     }
 
-    engine_set_context(NULL);
-    engine_cleanup();
+    engine_set_context(flow_engine, NULL);
+    engine_cleanup(&flow_engine);
 
     /* It's time to exit.  Clean up the databases if we are not restarting */
     if (!restart) {
@@ -4147,9 +4152,11 @@  inject_pkt(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
 static void
 engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc OVS_UNUSED,
-                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
+                     const char *argv[] OVS_UNUSED, void *arg)
 {
-    engine_trigger_recompute();
+    struct engine *e = arg;
+
+    engine_trigger_recompute(e);
     unixctl_command_reply(conn, NULL);
 }
 
@@ -4161,7 +4168,7 @@  lflow_cache_flush_cmd(struct unixctl_conn *conn OVS_UNUSED,
     VLOG_INFO("User triggered lflow cache flush.");
     struct lflow_output_persistent_data *fo_pd = arg_;
     lflow_cache_flush(fo_pd->lflow_cache);
-    engine_set_force_recompute(true);
+    engine_set_force_recompute(flow_engine, true);
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 2958a55e3..86d2df520 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -33,13 +33,6 @@ 
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
 
-static bool engine_force_recompute = false;
-static bool engine_run_aborted = false;
-static const struct engine_context *engine_context;
-
-static struct engine_node **engine_nodes;
-static size_t engine_n_nodes;
-
 static const char *engine_node_state_name[EN_STATE_MAX] = {
     [EN_STALE]     = "Stale",
     [EN_UPDATED]   = "Updated",
@@ -52,21 +45,21 @@  engine_recompute(struct engine_node *node, bool allowed,
                  const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
 
 void
-engine_set_force_recompute(bool val)
+engine_set_force_recompute(struct engine *e, bool val)
 {
-    engine_force_recompute = val;
+    e->engine_force_recompute = val;
 }
 
 const struct engine_context *
-engine_get_context(void)
+engine_get_context(struct engine *e)
 {
-    return engine_context;
+    return e->engine_context;
 }
 
 void
-engine_set_context(const struct engine_context *ctx)
+engine_set_context(struct engine *e, const struct engine_context *ctx)
 {
-    engine_context = ctx;
+    e->engine_context = ctx;
 }
 
 /* Builds the topologically sorted 'sorted_nodes' array starting from
@@ -111,10 +104,12 @@  engine_get_nodes(struct engine_node *node, size_t *n_count)
 
 static void
 engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
-                   const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
+                   const char *argv[] OVS_UNUSED, void *arg)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        struct engine_node *node = engine_nodes[i];
+    struct engine *e = arg;
+
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        struct engine_node *node = e->engine_nodes[i];
 
         memset(&node->stats, 0, sizeof node->stats);
     }
@@ -123,12 +118,13 @@  engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
 static void
 engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
-                  const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
+                  const char *argv[] OVS_UNUSED, void *arg)
 {
     struct ds dump = DS_EMPTY_INITIALIZER;
+    struct engine *e = arg;
 
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        struct engine_node *node = engine_nodes[i];
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        struct engine_node *node = e->engine_nodes[i];
 
         ds_put_format(&dump,
                       "Node: %s\n"
@@ -145,51 +141,58 @@  engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
 static void
 engine_trigger_recompute_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
-                             const char *argv[] OVS_UNUSED,
-                             void *arg OVS_UNUSED)
+                             const char *argv[] OVS_UNUSED, void *arg)
 {
-    engine_trigger_recompute();
+    struct engine *e = arg;
+
+    engine_trigger_recompute(e);
     unixctl_command_reply(conn, NULL);
 }
 
-void
-engine_init(struct engine_node *node, struct engine_arg *arg)
+void engine_init(struct engine **pe, struct engine_node *node,
+                 struct engine_arg *arg)
 {
-    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
+    struct engine *e = xzalloc(sizeof *e);
+
+    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
 
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->init) {
-            engine_nodes[i]->data =
-                engine_nodes[i]->init(engine_nodes[i], arg);
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->init) {
+            e->engine_nodes[i]->data =
+                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
         } else {
-            engine_nodes[i]->data = NULL;
+            e->engine_nodes[i]->data = NULL;
         }
+        e->engine_nodes[i]->e = e;
     }
 
     unixctl_command_register("inc-engine/show-stats", "", 0, 0,
-                             engine_dump_stats, NULL);
+                             engine_dump_stats, e);
     unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
-                             engine_clear_stats, NULL);
+                             engine_clear_stats, e);
     unixctl_command_register("inc-engine/recompute", "", 0, 0,
-                             engine_trigger_recompute_cmd, NULL);
+                             engine_trigger_recompute_cmd, e);
+    *pe = e;
 }
 
 void
-engine_cleanup(void)
+engine_cleanup(struct engine **pe)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->clear_tracked_data) {
-            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
+    struct engine *e = *pe;
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->clear_tracked_data) {
+            e->engine_nodes[i]->clear_tracked_data(
+                    e->engine_nodes[i]->data);
         }
 
-        if (engine_nodes[i]->cleanup) {
-            engine_nodes[i]->cleanup(engine_nodes[i]->data);
+        if (e->engine_nodes[i]->cleanup) {
+            e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data);
         }
-        free(engine_nodes[i]->data);
+        free(e->engine_nodes[i]->data);
     }
-    free(engine_nodes);
-    engine_nodes = NULL;
-    engine_n_nodes = 0;
+    e->engine_n_nodes = 0;
+    free(e->engine_nodes);
+    *pe = NULL;
 }
 
 struct engine_node *
@@ -284,10 +287,10 @@  engine_node_changed(struct engine_node *node)
 }
 
 bool
-engine_has_run(void)
+engine_has_run(struct engine *e)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        if (engine_nodes[i]->state != EN_STALE) {
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        if (e->engine_nodes[i]->state != EN_STALE) {
             return true;
         }
     }
@@ -295,9 +298,9 @@  engine_has_run(void)
 }
 
 bool
-engine_aborted(void)
+engine_aborted(struct engine *e)
 {
-    return engine_run_aborted;
+    return e->engine_run_aborted;
 }
 
 void *
@@ -316,14 +319,15 @@  engine_get_internal_data(struct engine_node *node)
 }
 
 void
-engine_init_run(void)
+engine_init_run(struct engine *e)
 {
     VLOG_DBG("Initializing new run");
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        engine_set_node_state(engine_nodes[i], EN_STALE);
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        engine_set_node_state(e->engine_nodes[i], EN_STALE);
 
-        if (engine_nodes[i]->clear_tracked_data) {
-            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
+        if (e->engine_nodes[i]->clear_tracked_data) {
+            e->engine_nodes[i]->clear_tracked_data(
+                    e->engine_nodes[i]->data);
         }
     }
 }
@@ -397,7 +401,8 @@  engine_compute(struct engine_node *node, bool recompute_allowed)
 }
 
 static void
-engine_run_node(struct engine_node *node, bool recompute_allowed)
+engine_run_node(struct engine *e, struct engine_node *node,
+                bool recompute_allowed)
 {
     if (!node->n_inputs) {
         /* Run the node handler which might change state. */
@@ -406,7 +411,7 @@  engine_run_node(struct engine_node *node, bool recompute_allowed)
         return;
     }
 
-    if (engine_force_recompute) {
+    if (e->engine_force_recompute) {
         engine_recompute(node, recompute_allowed, "forced");
         return;
     }
@@ -447,41 +452,41 @@  engine_run_node(struct engine_node *node, bool recompute_allowed)
 }
 
 void
-engine_run(bool recompute_allowed)
+engine_run(struct engine *e, bool recompute_allowed)
 {
     /* If the last run was aborted skip the incremental run because a
      * recompute is needed first.
      */
-    if (!recompute_allowed && engine_run_aborted) {
+    if (!recompute_allowed && e->engine_run_aborted) {
         return;
     }
 
-    engine_run_aborted = false;
-    for (size_t i = 0; i < engine_n_nodes; i++) {
-        engine_run_node(engine_nodes[i], recompute_allowed);
+    e->engine_run_aborted = false;
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
+        engine_run_node(e, e->engine_nodes[i], recompute_allowed);
 
-        if (engine_nodes[i]->state == EN_ABORTED) {
-            engine_nodes[i]->stats.abort++;
-            engine_run_aborted = true;
+        if (e->engine_nodes[i]->state == EN_ABORTED) {
+            e->engine_nodes[i]->stats.abort++;
+            e->engine_run_aborted = true;
             return;
         }
     }
 }
 
 bool
-engine_need_run(void)
+engine_need_run(struct engine *e)
 {
-    for (size_t i = 0; i < engine_n_nodes; i++) {
+    for (size_t i = 0; i < e->engine_n_nodes; i++) {
         /* Check only leaf nodes for updates. */
-        if (engine_nodes[i]->n_inputs) {
+        if (e->engine_nodes[i]->n_inputs) {
             continue;
         }
 
-        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
-        engine_nodes[i]->stats.recompute++;
-        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
-                 engine_node_state_name[engine_nodes[i]->state]);
-        if (engine_nodes[i]->state == EN_UPDATED) {
+        e->engine_nodes[i]->run(e->engine_nodes[i], e->engine_nodes[i]->data);
+        e->engine_nodes[i]->stats.recompute++;
+        VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name,
+                 engine_node_state_name[e->engine_nodes[i]->state]);
+        if (e->engine_nodes[i]->state == EN_UPDATED) {
             return true;
         }
     }
@@ -489,9 +494,9 @@  engine_need_run(void)
 }
 
 void
-engine_trigger_recompute(void)
+engine_trigger_recompute(struct engine *e)
 {
     VLOG_INFO("User triggered force recompute.");
-    engine_set_force_recompute(true);
+    engine_set_force_recompute(e, true);
     poll_immediate_wake();
 }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 9bfab1f7c..066e7b9c5 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -122,6 +122,8 @@  struct engine_stats {
 };
 
 struct engine_node {
+    struct engine *e;
+
     /* A unique name for each node. */
     char *name;
 
@@ -173,30 +175,41 @@  struct engine_node {
     struct engine_stats stats;
 };
 
+struct engine {
+    struct engine_node **engine_nodes;
+    size_t engine_n_nodes;
+
+    bool engine_force_recompute;
+    bool engine_run_aborted;
+
+    const struct engine_context *engine_context;
+};
+
 /* Initialize the data for the engine nodes. It calls each node's
  * init() method if not NULL passing the user supplied 'arg'.
  * It should be called before the main loop. */
-void engine_init(struct engine_node *node, struct engine_arg *arg);
+void engine_init(struct engine **pe, struct engine_node *node,
+                 struct engine_arg *arg);
 
 /* Initialize the engine nodes for a new run. It should be called in the
  * main processing loop before every potential engine_run().
  */
-void engine_init_run(void);
+void engine_init_run(struct engine *e);
 
 /* Execute the processing, which should be called in the main loop.
  * Updates the engine node's states accordingly. If 'recompute_allowed' is
  * false and a recompute is required by the current engine run then the engine
  * aborts.
  */
-void engine_run(bool recompute_allowed);
+void engine_run(struct engine *e, bool recompute_allowed);
 
 /* Clean up the data for the engine nodes. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
-void engine_cleanup(void);
+void engine_cleanup(struct engine **pe);
 
 /* Check if engine needs to run but didn't. */
-bool engine_need_run(void);
+bool engine_need_run(struct engine *e);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -216,7 +229,7 @@  void engine_add_input(struct engine_node *node, struct engine_node *input,
  * in circumstances when we are not sure there is change or not, or
  * when there is change but the engine couldn't be executed in that
  * iteration, and the change can't be tracked across iterations */
-void engine_set_force_recompute(bool val);
+void engine_set_force_recompute(struct engine *e, bool val);
 
 /* Return the current engine_context. The values in the context can be NULL
  * if the engine is run with allow_recompute == false in the current
@@ -224,9 +237,9 @@  void engine_set_force_recompute(bool val);
  * Therefore, it is the responsibility of the caller to check the context
  * values when called from change handlers.
  */
-const struct engine_context *engine_get_context(void);
+const struct engine_context *engine_get_context(struct engine *e);
 
-void engine_set_context(const struct engine_context *);
+void engine_set_context(struct engine *e, const struct engine_context *);
 
 void engine_set_node_state_at(struct engine_node *node,
                               enum engine_node_state state,
@@ -236,10 +249,10 @@  void engine_set_node_state_at(struct engine_node *node,
 bool engine_node_changed(struct engine_node *node);
 
 /* Return true if the engine has run in the last iteration. */
-bool engine_has_run(void);
+bool engine_has_run(struct engine *e);
 
 /* Returns true if during the last engine run we had to abort processing. */
-bool engine_aborted(void);
+bool engine_aborted(struct engine *e);
 
 /* Return a pointer to node data accessible for users outside the processing
  * engine. If the node data is not valid (e.g., last engine_run() failed or
@@ -265,7 +278,7 @@  void *engine_get_internal_data(struct engine_node *node);
     engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
 
 /* Trigger a full recompute. */
-void engine_trigger_recompute(void);
+void engine_trigger_recompute(struct engine *e);
 
 struct ed_ovsdb_index {
     const char *name;
diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index ffbdaf4e8..5451e0551 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -32,7 +32,7 @@  VLOG_DEFINE_THIS_MODULE(en_lflow);
 
 void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED)
 {
-    const struct engine_context *eng_ctx = engine_get_context();
+    const struct engine_context *eng_ctx = engine_get_context(node->e);
 
     struct lflow_input lflow_input;
 
diff --git a/northd/en-northd.c b/northd/en-northd.c
index 79da7e1c4..064f9d93a 100644
--- a/northd/en-northd.c
+++ b/northd/en-northd.c
@@ -32,7 +32,7 @@  VLOG_DEFINE_THIS_MODULE(en_northd);
 
 void en_northd_run(struct engine_node *node, void *data)
 {
-    const struct engine_context *eng_ctx = engine_get_context();
+    const struct engine_context *eng_ctx = engine_get_context(node->e);
 
     struct northd_input input_data;
 
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index af55221e3..83b344dd1 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -33,6 +33,8 @@ 
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_northd);
 
+static struct engine *flow_engine;
+
 #define NB_NODES \
     NB_NODE(nb_global, "nb_global") \
     NB_NODE(copp, "copp") \
@@ -229,7 +231,7 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     struct ovsdb_idl_index *sbrec_chassis_by_hostname =
         chassis_hostname_index_create(sb->idl);
 
-    engine_init(&en_lflow, &engine_arg);
+    engine_init(&flow_engine, &en_lflow, &engine_arg);
 
     engine_ovsdb_node_add_index(&en_sb_chassis,
                                 "sbrec_chassis_by_name",
@@ -251,14 +253,14 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
 void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
                          struct ovsdb_idl_txn *ovnsb_txn,
                          bool recompute) {
-    engine_init_run();
+    engine_init_run(flow_engine);
 
     /* Force a full recompute if instructed to, for example, after a NB/SB
      * reconnect event.  However, make sure we don't overwrite an existing
      * force-recompute request if 'recompute' is false.
      */
     if (recompute) {
-        engine_set_force_recompute(recompute);
+        engine_set_force_recompute(flow_engine, recompute);
     }
 
     struct engine_context eng_ctx = {
@@ -266,31 +268,31 @@  void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
         .ovnsb_idl_txn = ovnsb_txn,
     };
 
-    engine_set_context(&eng_ctx);
+    engine_set_context(flow_engine, &eng_ctx);
 
     if (ovnnb_txn && ovnsb_txn) {
-        engine_run(true);
+        engine_run(flow_engine, true);
     }
 
-    if (!engine_has_run()) {
-        if (engine_need_run()) {
+    if (!engine_has_run(flow_engine)) {
+        if (engine_need_run(flow_engine)) {
             VLOG_DBG("engine did not run, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_set_force_recompute(flow_engine, true);
             poll_immediate_wake();
         } else {
             VLOG_DBG("engine did not run, and it was not needed");
         }
-    } else if (engine_aborted()) {
+    } else if (engine_aborted(flow_engine)) {
         VLOG_DBG("engine was aborted, force recompute next time.");
-        engine_set_force_recompute(true);
+        engine_set_force_recompute(flow_engine, true);
         poll_immediate_wake();
     } else {
-        engine_set_force_recompute(false);
+        engine_set_force_recompute(flow_engine, false);
     }
 }
 
 void inc_proc_northd_cleanup(void)
 {
-    engine_cleanup();
-    engine_set_context(NULL);
+    engine_set_context(flow_engine, NULL);
+    engine_cleanup(&flow_engine);
 }