diff mbox series

[ovs-dev,v6,ovn,1/4] ovn-controller: Refactor I-P engine_run() tracking.

Message ID 20191122161318.4719.87657.stgit@dceara.remote.csb
State Accepted
Headers show
Series Refactor I-P engine and fix use after free. | expand

Commit Message

Dumitru Ceara Nov. 22, 2019, 4:13 p.m. UTC
This commit simplifies the logic of calling engine_run and engine_need_run in
order to reduce the number of external variables required to track the result
of the last engine execution.

The engine code is also refactored a bit and the engine_run() function is
split in different functions that handle computing/recomputing a node.

Signed-off-by: Dumitru Ceara <dceara@redhat.com>
---
 controller/ovn-controller.c |   33 ++++++------
 lib/inc-proc-eng.c          |  120 +++++++++++++++++++++++++++++--------------
 lib/inc-proc-eng.h          |    7 ++-
 3 files changed, 103 insertions(+), 57 deletions(-)

Comments

Han Zhou Nov. 26, 2019, 11:26 p.m. UTC | #1
On Fri, Nov 22, 2019 at 8:13 AM Dumitru Ceara <dceara@redhat.com> wrote:
>
> This commit simplifies the logic of calling engine_run and
engine_need_run in
> order to reduce the number of external variables required to track the
result
> of the last engine execution.
>
> The engine code is also refactored a bit and the engine_run() function is
> split in different functions that handle computing/recomputing a node.
>
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> ---
>  controller/ovn-controller.c |   33 ++++++------
>  lib/inc-proc-eng.c          |  120
+++++++++++++++++++++++++++++--------------
>  lib/inc-proc-eng.h          |    7 ++-
>  3 files changed, 103 insertions(+), 57 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index 27cb488..c56190f 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -1942,7 +1942,6 @@ main(int argc, char *argv[])
>                               &pending_pkt);
>
>      uint64_t engine_run_id = 0;
> -    uint64_t old_engine_run_id = 0;
>      bool engine_run_done = true;
>
>      unsigned int ovs_cond_seqno = UINT_MAX;
> @@ -1952,10 +1951,11 @@ main(int argc, char *argv[])
>      exiting = false;
>      restart = false;
>      while (!exiting) {
> +        engine_run_id++;
> +
>          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
>          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
>
 ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl));
> -        old_engine_run_id = engine_run_id;
>
>          struct ovsdb_idl_txn *ovs_idl_txn =
ovsdb_idl_loop_run(&ovs_idl_loop);
>          unsigned int new_ovs_cond_seqno
> @@ -2047,12 +2047,12 @@ main(int argc, char *argv[])
>                              if (engine_run_done) {
>                                  engine_set_abort_recompute(true);
>                                  engine_run_done =
engine_run(&en_flow_output,
> -
++engine_run_id);
> +
engine_run_id);
>                              }
>                          } else {
>                              engine_set_abort_recompute(false);
>                              engine_run_done = true;
> -                            engine_run(&en_flow_output, ++engine_run_id);
> +                            engine_run(&en_flow_output, engine_run_id);
>                          }
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> @@ -2097,17 +2097,20 @@ main(int argc, char *argv[])
>                  }
>
>              }
> -            if (old_engine_run_id == engine_run_id || !engine_run_done) {
> -                if (!engine_run_done ||
engine_need_run(&en_flow_output)) {
> -                    VLOG_DBG("engine did not run, force recompute next
time: "
> -                             "br_int %p, chassis %p", br_int, chassis);
> -                    engine_set_force_recompute(true);
> -                    poll_immediate_wake();
> -                } else {
> -                    VLOG_DBG("engine did not run, and it was not needed"
> -                             " either: br_int %p, chassis %p",
> -                             br_int, chassis);
> -                }
> +            if (engine_need_run(&en_flow_output, engine_run_id)) {
> +                VLOG_DBG("engine did not run, force recompute next time:
"
> +                            "br_int %p, chassis %p", br_int, chassis);
> +                engine_set_force_recompute(true);
> +                poll_immediate_wake();
> +            } else if (!engine_run_done) {
> +                VLOG_DBG("engine was aborted, force recompute next time:
"
> +                         "br_int %p, chassis %p", br_int, chassis);
> +                engine_set_force_recompute(true);
> +                poll_immediate_wake();
> +            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> +                VLOG_DBG("engine did not run, and it was not needed"
> +                         " either: br_int %p, chassis %p",
> +                         br_int, chassis);
>              } else {
>                  engine_set_force_recompute(false);
>              }
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 1064a08..ff07ad9 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -129,14 +129,68 @@ engine_ovsdb_node_add_index(struct engine_node
*node, const char *name,
>  }
>
>  bool
> -engine_run(struct engine_node *node, uint64_t run_id)
> +engine_has_run(struct engine_node *node, uint64_t run_id)
> +{
> +    return node->run_id == run_id;
> +}
> +
> +/* Do a full recompute (or at least try). If we're not allowed then
> + * mark the node as "aborted".
> + */
> +static bool
> +engine_recompute(struct engine_node *node, bool forced, bool allowed)
> +{
> +    VLOG_DBG("node: %s, recompute (%s)", node->name,
> +             forced ? "forced" : "triggered");
> +
> +    if (!allowed) {
> +        VLOG_DBG("node: %s, recompute aborted", node->name);
> +        return false;
> +    }
> +
> +    node->run(node);
> +    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> +    return true;
> +}
> +
> +/* Return true if the node could be computed, false otherwise. */
> +static bool
> +engine_compute(struct engine_node *node, bool recompute_allowed)
> +{
> +    for (size_t i = 0; i < node->n_inputs; i++) {
> +        /* If the input node data changed call its change handler. */
> +        if (node->inputs[i].node->changed) {
> +            VLOG_DBG("node: %s, handle change for input %s",
> +                     node->name, node->inputs[i].node->name);
> +
> +            /* If the input change can't be handled incrementally, run
> +             * the node handler.
> +             */
> +            if (!node->inputs[i].change_handler(node)) {
> +                VLOG_DBG("node: %s, can't handle change for input %s, "
> +                         "fall back to recompute",
> +                         node->name, node->inputs[i].node->name);
> +                return engine_recompute(node, false, recompute_allowed);
> +            }
> +        }
> +    }
> +
> +    return true;
> +}
> +
> +bool engine_run(struct engine_node *node, uint64_t run_id)
>  {
>      if (node->run_id == run_id) {
> +        /* The node was already updated in this run (could be input for
> +         * multiple other nodes). Stop processing.
> +         */
>          return true;
>      }
> -    node->run_id = run_id;
>
> +    /* Initialize the node for this run. */
> +    node->run_id = run_id;
>      node->changed = false;
> +
>      if (!node->n_inputs) {
>          node->run(node);
>          VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> @@ -150,59 +204,45 @@ engine_run(struct engine_node *node, uint64_t
run_id)
>      }
>
>      bool need_compute = false;
> -    bool need_recompute = false;
>
>      if (engine_force_recompute) {
> -        need_recompute = true;
> -    } else {
> -        for (size_t i = 0; i < node->n_inputs; i++) {
> -            if (node->inputs[i].node->changed) {
> -                need_compute = true;
> -                if (!node->inputs[i].change_handler) {
> -                    need_recompute = true;
> -                    break;
> -                }
> -            }
> -        }
> +        return engine_recompute(node, true, !engine_abort_recompute);
>      }
>
> -    if (need_recompute) {
> -        VLOG_DBG("node: %s, recompute (%s)", node->name,
> -                 engine_force_recompute ? "forced" : "triggered");
> -        if (engine_abort_recompute) {
> -            VLOG_DBG("node: %s, recompute aborted", node->name);
> -            return false;
> -        }
> -        node->run(node);
> -    } else if (need_compute) {
> -        for (size_t i = 0; i < node->n_inputs; i++) {
> -            if (node->inputs[i].node->changed) {
> -                VLOG_DBG("node: %s, handle change for input %s",
> -                         node->name, node->inputs[i].node->name);
> -                if (!node->inputs[i].change_handler(node)) {
> -                    VLOG_DBG("node: %s, can't handle change for input
%s, "
> -                             "fall back to recompute",
> -                             node->name, node->inputs[i].node->name);
> -                    if (engine_abort_recompute) {
> -                        VLOG_DBG("node: %s, recompute aborted",
node->name);
> -                        return false;
> -                    }
> -                    node->run(node);
> -                    break;
> -                }
> +    /* If any of the inputs updated data but there is no change_handler,
then
> +     * recompute the current node too.
> +     */
> +    for (size_t i = 0; i < node->n_inputs; i++) {
> +        if (node->inputs[i].node->changed) {
> +            need_compute = true;
> +
> +            /* Trigger a recompute if we don't have a change handler. */
> +            if (!node->inputs[i].change_handler) {
> +                return engine_recompute(node, false,
!engine_abort_recompute);
>              }
>          }
>      }
>
> +    if (need_compute) {
> +        /* If we couldn't compute the node we either aborted or triggered
> +         * a full recompute. In any case, stop processing.
> +         */
> +        return engine_compute(node, !engine_abort_recompute);
> +    }
> +
>      VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
>      return true;
>  }
>
>  bool
> -engine_need_run(struct engine_node *node)
> +engine_need_run(struct engine_node *node, uint64_t run_id)
>  {
>      size_t i;
>
> +    if (node->run_id == run_id) {
> +        return false;
> +    }
> +
>      if (!node->n_inputs) {
>          node->run(node);
>          VLOG_DBG("input node: %s, changed: %d", node->name,
node->changed);
> @@ -210,7 +250,7 @@ engine_need_run(struct engine_node *node)
>      }
>
>      for (i = 0; i < node->n_inputs; i++) {
> -        if (engine_need_run(node->inputs[i].node)) {
> +        if (engine_need_run(node->inputs[i].node, run_id)) {
>              return true;
>          }
>      }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index 3a69dc2..abd41b2 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -130,9 +130,9 @@ bool engine_run(struct engine_node *, uint64_t
run_id);
>   * terminates. */
>  void engine_cleanup(struct engine_node *);
>
> -/* Check if engine needs to run, i.e. any change to be processed. */
> +/* Check if engine needs to run but didn't. */
>  bool
> -engine_need_run(struct engine_node *);
> +engine_need_run(struct engine_node *, uint64_t run_id);
>
>  /* Get the input node with <name> for <node> */
>  struct engine_node * engine_get_input(const char *input_name,
> @@ -159,6 +159,9 @@ const struct engine_context *
engine_get_context(void);
>
>  void engine_set_context(const struct engine_context *);
>
> +/* Return true if the engine has run for 'node' in the 'run_id'
iteration. */
> +bool engine_has_run(struct engine_node *node, uint64_t run_id);
> +
>  struct ed_ovsdb_index {
>      const char *name;
>      struct ovsdb_idl_index *index;
>

Thanks. I applied this patch to master. I will review the rest in the
series.
Dumitru Ceara Nov. 27, 2019, 8:47 a.m. UTC | #2
On Wed, Nov 27, 2019 at 12:26 AM Han Zhou <hzhou@ovn.org> wrote:
>
>
> Thanks. I applied this patch to master. I will review the rest in the series.

Great, thanks!
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 27cb488..c56190f 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -1942,7 +1942,6 @@  main(int argc, char *argv[])
                              &pending_pkt);
 
     uint64_t engine_run_id = 0;
-    uint64_t old_engine_run_id = 0;
     bool engine_run_done = true;
 
     unsigned int ovs_cond_seqno = UINT_MAX;
@@ -1952,10 +1951,11 @@  main(int argc, char *argv[])
     exiting = false;
     restart = false;
     while (!exiting) {
+        engine_run_id++;
+
         update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
         update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
         ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl));
-        old_engine_run_id = engine_run_id;
 
         struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
         unsigned int new_ovs_cond_seqno
@@ -2047,12 +2047,12 @@  main(int argc, char *argv[])
                             if (engine_run_done) {
                                 engine_set_abort_recompute(true);
                                 engine_run_done = engine_run(&en_flow_output,
-                                                             ++engine_run_id);
+                                                             engine_run_id);
                             }
                         } else {
                             engine_set_abort_recompute(false);
                             engine_run_done = true;
-                            engine_run(&en_flow_output, ++engine_run_id);
+                            engine_run(&en_flow_output, engine_run_id);
                         }
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
@@ -2097,17 +2097,20 @@  main(int argc, char *argv[])
                 }
 
             }
-            if (old_engine_run_id == engine_run_id || !engine_run_done) {
-                if (!engine_run_done || engine_need_run(&en_flow_output)) {
-                    VLOG_DBG("engine did not run, force recompute next time: "
-                             "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
-                    poll_immediate_wake();
-                } else {
-                    VLOG_DBG("engine did not run, and it was not needed"
-                             " either: br_int %p, chassis %p",
-                             br_int, chassis);
-                }
+            if (engine_need_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, force recompute next time: "
+                            "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (!engine_run_done) {
+                VLOG_DBG("engine was aborted, force recompute next time: "
+                         "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, and it was not needed"
+                         " either: br_int %p, chassis %p",
+                         br_int, chassis);
             } else {
                 engine_set_force_recompute(false);
             }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 1064a08..ff07ad9 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -129,14 +129,68 @@  engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
 }
 
 bool
-engine_run(struct engine_node *node, uint64_t run_id)
+engine_has_run(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id;
+}
+
+/* Do a full recompute (or at least try). If we're not allowed then
+ * mark the node as "aborted".
+ */
+static bool
+engine_recompute(struct engine_node *node, bool forced, bool allowed)
+{
+    VLOG_DBG("node: %s, recompute (%s)", node->name,
+             forced ? "forced" : "triggered");
+
+    if (!allowed) {
+        VLOG_DBG("node: %s, recompute aborted", node->name);
+        return false;
+    }
+
+    node->run(node);
+    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
+    return true;
+}
+
+/* Return true if the node could be computed, false otherwise. */
+static bool
+engine_compute(struct engine_node *node, bool recompute_allowed)
+{
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        /* If the input node data changed call its change handler. */
+        if (node->inputs[i].node->changed) {
+            VLOG_DBG("node: %s, handle change for input %s",
+                     node->name, node->inputs[i].node->name);
+
+            /* If the input change can't be handled incrementally, run
+             * the node handler.
+             */
+            if (!node->inputs[i].change_handler(node)) {
+                VLOG_DBG("node: %s, can't handle change for input %s, "
+                         "fall back to recompute",
+                         node->name, node->inputs[i].node->name);
+                return engine_recompute(node, false, recompute_allowed);
+            }
+        }
+    }
+
+    return true;
+}
+
+bool engine_run(struct engine_node *node, uint64_t run_id)
 {
     if (node->run_id == run_id) {
+        /* The node was already updated in this run (could be input for
+         * multiple other nodes). Stop processing.
+         */
         return true;
     }
-    node->run_id = run_id;
 
+    /* Initialize the node for this run. */
+    node->run_id = run_id;
     node->changed = false;
+
     if (!node->n_inputs) {
         node->run(node);
         VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
@@ -150,59 +204,45 @@  engine_run(struct engine_node *node, uint64_t run_id)
     }
 
     bool need_compute = false;
-    bool need_recompute = false;
 
     if (engine_force_recompute) {
-        need_recompute = true;
-    } else {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                need_compute = true;
-                if (!node->inputs[i].change_handler) {
-                    need_recompute = true;
-                    break;
-                }
-            }
-        }
+        return engine_recompute(node, true, !engine_abort_recompute);
     }
 
-    if (need_recompute) {
-        VLOG_DBG("node: %s, recompute (%s)", node->name,
-                 engine_force_recompute ? "forced" : "triggered");
-        if (engine_abort_recompute) {
-            VLOG_DBG("node: %s, recompute aborted", node->name);
-            return false;
-        }
-        node->run(node);
-    } else if (need_compute) {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                VLOG_DBG("node: %s, handle change for input %s",
-                         node->name, node->inputs[i].node->name);
-                if (!node->inputs[i].change_handler(node)) {
-                    VLOG_DBG("node: %s, can't handle change for input %s, "
-                             "fall back to recompute",
-                             node->name, node->inputs[i].node->name);
-                    if (engine_abort_recompute) {
-                        VLOG_DBG("node: %s, recompute aborted", node->name);
-                        return false;
-                    }
-                    node->run(node);
-                    break;
-                }
+    /* If any of the inputs updated data but there is no change_handler, then
+     * recompute the current node too.
+     */
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        if (node->inputs[i].node->changed) {
+            need_compute = true;
+
+            /* Trigger a recompute if we don't have a change handler. */
+            if (!node->inputs[i].change_handler) {
+                return engine_recompute(node, false, !engine_abort_recompute);
             }
         }
     }
 
+    if (need_compute) {
+        /* If we couldn't compute the node we either aborted or triggered
+         * a full recompute. In any case, stop processing.
+         */
+        return engine_compute(node, !engine_abort_recompute);
+    }
+
     VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
     return true;
 }
 
 bool
-engine_need_run(struct engine_node *node)
+engine_need_run(struct engine_node *node, uint64_t run_id)
 {
     size_t i;
 
+    if (node->run_id == run_id) {
+        return false;
+    }
+
     if (!node->n_inputs) {
         node->run(node);
         VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
@@ -210,7 +250,7 @@  engine_need_run(struct engine_node *node)
     }
 
     for (i = 0; i < node->n_inputs; i++) {
-        if (engine_need_run(node->inputs[i].node)) {
+        if (engine_need_run(node->inputs[i].node, run_id)) {
             return true;
         }
     }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 3a69dc2..abd41b2 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -130,9 +130,9 @@  bool engine_run(struct engine_node *, uint64_t run_id);
  * terminates. */
 void engine_cleanup(struct engine_node *);
 
-/* Check if engine needs to run, i.e. any change to be processed. */
+/* Check if engine needs to run but didn't. */
 bool
-engine_need_run(struct engine_node *);
+engine_need_run(struct engine_node *, uint64_t run_id);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -159,6 +159,9 @@  const struct engine_context * engine_get_context(void);
 
 void engine_set_context(const struct engine_context *);
 
+/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
+bool engine_has_run(struct engine_node *node, uint64_t run_id);
+
 struct ed_ovsdb_index {
     const char *name;
     struct ovsdb_idl_index *index;