diff mbox series

[ovs-dev,01/10] ovn-controller: Incremental processing engine

Message ID 1526351009-14114-2-git-send-email-hzhou8@ebay.com
State Superseded
Headers show
Series ovn-controller incremental processing | expand

Commit Message

Han Zhou May 15, 2018, 2:23 a.m. UTC
This patch implements the engine which will be used in future patches
for ovn-controller incremental processing.

Signed-off-by: Han Zhou <hzhou8@ebay.com>
---
 ovn/lib/automake.mk    |   4 +-
 ovn/lib/inc-proc-eng.c | 125 +++++++++++++++++++++++++++
 ovn/lib/inc-proc-eng.h | 224 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 352 insertions(+), 1 deletion(-)
 create mode 100644 ovn/lib/inc-proc-eng.c
 create mode 100644 ovn/lib/inc-proc-eng.h

Comments

Ben Pfaff May 23, 2018, 11 p.m. UTC | #1
On Mon, May 14, 2018 at 07:23:20PM -0700, Han Zhou wrote:
> This patch implements the engine which will be used in future patches
> for ovn-controller incremental processing.
> 
> Signed-off-by: Han Zhou <hzhou8@ebay.com>

Thanks a lot for working on this.  I read through it in detail and I
have some comments.  Most of my comments are trivial, so I'll just
append them in the form of an incremental diff that I hope you will fold
in.

I don't think that engine_get_input() makes sense as an inline
function.  I would prefer to see it in the .c file.

I am not sure whether engine_add_input() makes sense as an inline.

I think I won't have serious comments on the technique until I've read
more of the patches.

--8<--------------------------cut here-------------------------->8--

diff --git a/ovn/lib/inc-proc-eng.c b/ovn/lib/inc-proc-eng.c
index 54c7fd6b9723..4699702ae740 100644
--- a/ovn/lib/inc-proc-eng.c
+++ b/ovn/lib/inc-proc-eng.c
@@ -67,18 +67,14 @@ engine_run(struct engine_node *node, uint64_t run_id)
     }
     node->run_id = run_id;
 
-    if (node->changed) {
-        node->changed = false;
-    }
+    node->changed = false;
     if (!node->n_inputs) {
         node->run(node);
         VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
         return;
     }
 
-    size_t i;
-
-    for (i = 0; i < node->n_inputs; i++) {
+    for (size_t i = 0; i < node->n_inputs; i++) {
         engine_run(node->inputs[i].node, run_id);
     }
 
@@ -88,7 +84,7 @@ engine_run(struct engine_node *node, uint64_t run_id)
     if (engine_force_recompute) {
         need_recompute = true;
     } else {
-        for (i = 0; i < node->n_inputs; i++) {
+        for (size_t i = 0; i < node->n_inputs; i++) {
             if (node->inputs[i].node->changed) {
                 need_compute = true;
                 if (!node->inputs[i].change_handler) {
@@ -104,7 +100,7 @@ engine_run(struct engine_node *node, uint64_t run_id)
                  engine_force_recompute ? "forced" : "triggered");
         node->run(node);
     } else if (need_compute) {
-        for (i = 0; i < node->n_inputs; i++) {
+        for (size_t i = 0; i < node->n_inputs; i++) {
             if (node->inputs[i].node->changed) {
                 VLOG_DBG("node: %s, handle change for input %s",
                          node->name, node->inputs[i].node->name);
@@ -120,6 +116,4 @@ engine_run(struct engine_node *node, uint64_t run_id)
     }
 
     VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-
 }
-
diff --git a/ovn/lib/inc-proc-eng.h b/ovn/lib/inc-proc-eng.h
index 3e0ec167636d..b09ce9f7c7c0 100644
--- a/ovn/lib/inc-proc-eng.h
+++ b/ovn/lib/inc-proc-eng.h
@@ -20,11 +20,11 @@
 /* The Incremental Processing Engine is a framework for incrementally
  * processing changes from different inputs. The main user is ovn-controller.
  * To compute desired states (e.g. openflow rules) based on many inputs (e.g.
- * south-bound DB tables, local OVSDB interfaces, etc.), it is straighforward
+ * south-bound DB tables, local OVSDB interfaces, etc.), it is straightforward
  * to recompute everything when there is any change in any inputs, but it
  * is inefficient when the size of the input data becomes large. Instead,
  * tracking the changes and update the desired states based on what's changed
- * is more efficient and scalable. However, it is not straighforward to
+ * is more efficient and scalable. However, it is not straightforward to
  * implement the change-based processing when there are a big number of
  * inputs. In addition, what makes it more complicated is that intermediate
  * results needs to be computed, which needs to be reused in different part
@@ -40,7 +40,7 @@
  *
  * The engine is composed of engine_nodes. Each engine_node is either
  * an input, an output or both (intermediate result). Each engine node
- * maintains its own data, which is persistent across interations. Each node
+ * maintains its own data, which is persistent across interactions. Each node
  * has zero to ENGINE_MAX_INPUT inputs, which creates a DAG (directed
  * acyclic graph). For each input of each engine_node, there is a
  * change_handler to process changes of that input, and update the data
@@ -65,8 +65,7 @@
 struct engine_node;
 
 struct engine_node_input {
-
-    /* the input node */
+    /* The input node. */
     struct engine_node *node;
 
     /* Change handler for changes of the input node. The changes may need to be
@@ -78,58 +77,54 @@ struct engine_node_input {
 };
 
 struct engine_node {
-
     /* A unique id to distinguish each iteration of the engine_run(). */
     uint64_t run_id;
 
-    /* A unique name for each node */
-    char* name;
+    /* A unique name for each node. */
+    char *name;
 
-    /* Number of inputs of this node */
+    /* Number of inputs of this node. */
     size_t n_inputs;
 
-    /* Inputs of this node */
+    /* Inputs of this node. */
     struct engine_node_input inputs[ENGINE_MAX_INPUT];
 
     /* Data of this node. It is vague and interpreted by the related functions.
      * The content of the data should be changed only by the change_handlers
      * and run() function of the current node. Users should ensure that the
      * data is read-only in change-handlers of the nodes that depends on this
-     * node. */ 
+     * node. */
     void *data;
 
-    /* Whether the data is changed in the last engine run */
+    /* Whether the data changed in the last engine run. */
     bool changed;
 
-    /* Context data for the engine processing, such as OVSDB IDLs */
+    /* Context data for the engine processing, such as OVSDB IDLs. */
     void *context;
 
     /* Method to initialize data. It may be NULL. */
-    void (*init)(struct engine_node *node);
+    void (*init)(struct engine_node *);
 
     /* Method to clean up data. It may be NULL. */
-    void (*cleanup)(struct engine_node *node);
+    void (*cleanup)(struct engine_node *);
 
-    /* Fully processing all inputs of this node and regenerate the data
-     * of this node */
-    void (*run)(struct engine_node *node);
+    /* Fully processes all inputs of this node and regenerates the data
+     * of this node. */
+    void (*run)(struct engine_node *);
 };
 
 /* Initialize the data for the engine nodes recursively. It calls each node's
  * init() method if not NULL. It should be called before the main loop. */
-void
-engine_init(struct engine_node *node);
+void engine_init(struct engine_node *);
 
 /* Execute the processing recursively, which should be called in the main
  * loop. */
-void
-engine_run(struct engine_node *node, uint64_t run_id);
+void engine_run(struct engine_node *, uint64_t run_id);
 
 /* Clean up the data for the engine nodes recursively. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
-void
-engine_cleanup(struct engine_node *node);
+void engine_cleanup(struct engine_node *);
 
 /* Get the input node with <name> for <node> */
 static inline struct engine_node *
@@ -147,7 +142,7 @@ engine_get_input(const char *input_name, struct engine_node *node)
 /* Add an input (dependency) for <node>, with corresponding change_handler,
  * which can be NULL. If the change_handler is NULL, the engine will not
  * be able to process the change incrementally, and will fall back to call
- * the run method to recompute */
+ * the run method to recompute. */
 static inline void
 engine_add_input(struct engine_node *node, struct engine_node *input,
     bool (*change_handler)(struct engine_node *node))
@@ -162,10 +157,9 @@ engine_add_input(struct engine_node *node, struct engine_node *input,
  * in circumstances when we are not sure there is change or not, or
  * when there is change but the engine couldn't be executed in that
  * iteration, and the change can't be tracked across iterations */
-void
-engine_set_force_recompute(bool val);
+void engine_set_force_recompute(bool val);
 
-/* Macro to define an engine node */
+/* Macro to define an engine node. */
 #define ENGINE_NODE(NAME, NAME_STR, CTX) \
     struct engine_node en_##NAME = { \
         .name = NAME_STR, \
Han Zhou May 24, 2018, 10:15 p.m. UTC | #2
On Wed, May 23, 2018 at 4:00 PM, Ben Pfaff <blp@ovn.org> wrote:
>
> On Mon, May 14, 2018 at 07:23:20PM -0700, Han Zhou wrote:
> > This patch implements the engine which will be used in future patches
> > for ovn-controller incremental processing.
> >
> > Signed-off-by: Han Zhou <hzhou8@ebay.com>
>
> Thanks a lot for working on this.  I read through it in detail and I
> have some comments.  Most of my comments are trivial, so I'll just
> append them in the form of an incremental diff that I hope you will fold
> in.

Thanks for reviewing! I will apply the suggested changes (and I will follow
the style for future patches).

>
> I don't think that engine_get_input() makes sense as an inline
> function.  I would prefer to see it in the .c file.

Agree.

>
> I am not sure whether engine_add_input() makes sense as an inline.

I think it's unnecessary to inline.

I rebased the patch series on master and submitted v2 with suggested
changes applied:

https://patchwork.ozlabs.org/project/openvswitch/list/?series=46497

(Somehow the cover letter always missing in patchwork...)
Ben Pfaff May 24, 2018, 10:30 p.m. UTC | #3
On Thu, May 24, 2018 at 03:15:13PM -0700, Han Zhou wrote:
> (Somehow the cover letter always missing in patchwork...)

Yeah, patchwork only tracks patches.

Cover letters don't get into Git, either, since they don't become part
of commits, so when there's important information in a cover letter it's
best to repeat it in some commit message.
diff mbox series

Patch

diff --git a/ovn/lib/automake.mk b/ovn/lib/automake.mk
index 6178fc2..c1d37c5 100644
--- a/ovn/lib/automake.mk
+++ b/ovn/lib/automake.mk
@@ -17,7 +17,9 @@  ovn_lib_libovn_la_SOURCES = \
 	ovn/lib/ovn-util.c \
 	ovn/lib/ovn-util.h \
 	ovn/lib/logical-fields.c \
-	ovn/lib/logical-fields.h
+	ovn/lib/logical-fields.h \
+	ovn/lib/inc-proc-eng.c \
+	ovn/lib/inc-proc-eng.h
 nodist_ovn_lib_libovn_la_SOURCES = \
 	ovn/lib/ovn-nb-idl.c \
 	ovn/lib/ovn-nb-idl.h \
diff --git a/ovn/lib/inc-proc-eng.c b/ovn/lib/inc-proc-eng.c
new file mode 100644
index 0000000..54c7fd6
--- /dev/null
+++ b/ovn/lib/inc-proc-eng.c
@@ -0,0 +1,125 @@ 
+/*
+ * Copyright (c) 2018 eBay Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include <errno.h>
+#include <getopt.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "openvswitch/dynamic-string.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/vlog.h"
+#include "inc-proc-eng.h"
+
+VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
+
+static bool engine_force_recompute = false;
+
+void
+engine_set_force_recompute(bool val)
+{
+    engine_force_recompute = val;
+}
+
+void
+engine_init(struct engine_node *node)
+{
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        engine_init(node->inputs[i].node);
+    }
+    if (node->init) {
+        node->init(node);
+    }
+}
+
+void
+engine_cleanup(struct engine_node *node)
+{
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        engine_cleanup(node->inputs[i].node);
+    }
+    if (node->cleanup) {
+        node->cleanup(node);
+    }
+}
+
+void
+engine_run(struct engine_node *node, uint64_t run_id)
+{
+    if (node->run_id == run_id) {
+        return;
+    }
+    node->run_id = run_id;
+
+    if (node->changed) {
+        node->changed = false;
+    }
+    if (!node->n_inputs) {
+        node->run(node);
+        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
+        return;
+    }
+
+    size_t i;
+
+    for (i = 0; i < node->n_inputs; i++) {
+        engine_run(node->inputs[i].node, run_id);
+    }
+
+    bool need_compute = false;
+    bool need_recompute = false;
+
+    if (engine_force_recompute) {
+        need_recompute = true;
+    } else {
+        for (i = 0; i < node->n_inputs; i++) {
+            if (node->inputs[i].node->changed) {
+                need_compute = true;
+                if (!node->inputs[i].change_handler) {
+                    need_recompute = true;
+                    break;
+                }
+            }
+        }
+    }
+
+    if (need_recompute) {
+        VLOG_DBG("node: %s, recompute (%s)", node->name,
+                 engine_force_recompute ? "forced" : "triggered");
+        node->run(node);
+    } else if (need_compute) {
+        for (i = 0; i < node->n_inputs; i++) {
+            if (node->inputs[i].node->changed) {
+                VLOG_DBG("node: %s, handle change for input %s",
+                         node->name, node->inputs[i].node->name);
+                if (!node->inputs[i].change_handler(node)) {
+                    VLOG_DBG("node: %s, can't handle change for input %s, "
+                             "fall back to recompute",
+                             node->name, node->inputs[i].node->name);
+                    node->run(node);
+                    break;
+                }
+            }
+        }
+    }
+
+    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
+
+}
+
diff --git a/ovn/lib/inc-proc-eng.h b/ovn/lib/inc-proc-eng.h
new file mode 100644
index 0000000..3e0ec16
--- /dev/null
+++ b/ovn/lib/inc-proc-eng.h
@@ -0,0 +1,224 @@ 
+/*
+ * Copyright (c) 2018 eBay Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INC_PROC_ENG_H
+#define INC_PROC_ENG_H 1
+
+/* The Incremental Processing Engine is a framework for incrementally
+ * processing changes from different inputs. The main user is ovn-controller.
+ * To compute desired states (e.g. openflow rules) based on many inputs (e.g.
+ * south-bound DB tables, local OVSDB interfaces, etc.), it is straighforward
+ * to recompute everything when there is any change in any inputs, but it
+ * is inefficient when the size of the input data becomes large. Instead,
+ * tracking the changes and update the desired states based on what's changed
+ * is more efficient and scalable. However, it is not straighforward to
+ * implement the change-based processing when there are a big number of
+ * inputs. In addition, what makes it more complicated is that intermediate
+ * results needs to be computed, which needs to be reused in different part
+ * of the processing and finally generates the final desired states. It is
+ * proved to be difficult and error-prone to implement this kind of complex
+ * processing by ad-hoc implementation.
+ *
+ * This framework is to provide a generic way to solve the above problem.
+ * It does not understand the processing logic, but provides a unified way
+ * to describe the inputs and dependencies clearly, with interfaces for
+ * users to implement the processing logic for how to handle each input
+ * changes.
+ *
+ * The engine is composed of engine_nodes. Each engine_node is either
+ * an input, an output or both (intermediate result). Each engine node
+ * maintains its own data, which is persistent across interations. Each node
+ * has zero to ENGINE_MAX_INPUT inputs, which creates a DAG (directed
+ * acyclic graph). For each input of each engine_node, there is a
+ * change_handler to process changes of that input, and update the data
+ * of the engine_node. Then the user can simply call the run() method
+ * of the engine so that the processing will happen in the order according
+ * to the dependencies defined and handle the changes incrementally.
+ *
+ * While the more fine-grained dependencies and change-handlers are
+ * implemented, the more efficient the processing will be, it is not
+ * realistic to implement all change-processing for all inputs (and
+ * intermediate results). The engine doesn't require change-handler to be
+ * implemented for every input of every node. Users can choose to implement
+ * the most important change-handlers (for the changes happens most
+ * frequently) for overall performance. When there is no change_handler
+ * defined for a certain input on a certain engine_node, the run() method
+ * of the engine_node will be called to fall-back to a full recompute
+ * against all its inputs.
+ */
+
+#define ENGINE_MAX_INPUT 256
+
+struct engine_node;
+
+struct engine_node_input {
+
+    /* the input node */
+    struct engine_node *node;
+
+    /* Change handler for changes of the input node. The changes may need to be
+     * evaluated against all the other inputs. Returns:
+     *  - true: if change can be handled
+     *  - false: if change cannot be handled (indicating full recompute needed)
+     */
+    bool (*change_handler)(struct engine_node *node);
+};
+
+struct engine_node {
+
+    /* A unique id to distinguish each iteration of the engine_run(). */
+    uint64_t run_id;
+
+    /* A unique name for each node */
+    char* name;
+
+    /* Number of inputs of this node */
+    size_t n_inputs;
+
+    /* Inputs of this node */
+    struct engine_node_input inputs[ENGINE_MAX_INPUT];
+
+    /* Data of this node. It is vague and interpreted by the related functions.
+     * The content of the data should be changed only by the change_handlers
+     * and run() function of the current node. Users should ensure that the
+     * data is read-only in change-handlers of the nodes that depends on this
+     * node. */ 
+    void *data;
+
+    /* Whether the data is changed in the last engine run */
+    bool changed;
+
+    /* Context data for the engine processing, such as OVSDB IDLs */
+    void *context;
+
+    /* Method to initialize data. It may be NULL. */
+    void (*init)(struct engine_node *node);
+
+    /* Method to clean up data. It may be NULL. */
+    void (*cleanup)(struct engine_node *node);
+
+    /* Fully processing all inputs of this node and regenerate the data
+     * of this node */
+    void (*run)(struct engine_node *node);
+};
+
+/* Initialize the data for the engine nodes recursively. It calls each node's
+ * init() method if not NULL. It should be called before the main loop. */
+void
+engine_init(struct engine_node *node);
+
+/* Execute the processing recursively, which should be called in the main
+ * loop. */
+void
+engine_run(struct engine_node *node, uint64_t run_id);
+
+/* Clean up the data for the engine nodes recursively. It calls each node's
+ * cleanup() method if not NULL. It should be called before the program
+ * terminates. */
+void
+engine_cleanup(struct engine_node *node);
+
+/* Get the input node with <name> for <node> */
+static inline struct engine_node *
+engine_get_input(const char *input_name, struct engine_node *node)
+{
+    size_t i;
+    for (i = 0; i < node->n_inputs; i++) {
+        if (!strcmp(node->inputs[i].node->name, input_name)) {
+            return node->inputs[i].node;
+        }
+    }
+    return NULL;
+}
+
+/* Add an input (dependency) for <node>, with corresponding change_handler,
+ * which can be NULL. If the change_handler is NULL, the engine will not
+ * be able to process the change incrementally, and will fall back to call
+ * the run method to recompute */
+static inline void
+engine_add_input(struct engine_node *node, struct engine_node *input,
+    bool (*change_handler)(struct engine_node *node))
+{
+    ovs_assert(node->n_inputs < ENGINE_MAX_INPUT);
+    node->inputs[node->n_inputs].node = input;
+    node->inputs[node->n_inputs].change_handler = change_handler;
+    node->n_inputs ++;
+}
+
+/* Force the engine to recompute everything if set to true. It is used
+ * in circumstances when we are not sure there is change or not, or
+ * when there is change but the engine couldn't be executed in that
+ * iteration, and the change can't be tracked across iterations */
+void
+engine_set_force_recompute(bool val);
+
+/* Macro to define an engine node */
+#define ENGINE_NODE(NAME, NAME_STR, CTX) \
+    struct engine_node en_##NAME = { \
+        .name = NAME_STR, \
+        .data = &ed_##NAME, \
+        .context = CTX, \
+        .init = NAME##_init, \
+        .run = NAME##_run, \
+        .cleanup = NAME##_cleanup, \
+    };
+
+/* Macro to define member functions of an engine node which represents
+ * a table of OVSDB */
+#define ENGINE_FUNC_OVSDB(DB_NAME, TBL_NAME, IDL) \
+static void \
+DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
+{ \
+    static bool first_run = true; \
+    if (first_run) { \
+        first_run = false; \
+        node->changed = true; \
+        return; \
+    } \
+    struct controller_ctx *ctx = (struct controller_ctx *)node->context; \
+    if (DB_NAME##rec_##TBL_NAME##_track_get_first(ctx->IDL)) { \
+        node->changed = true; \
+        return; \
+    } \
+    node->changed = false; \
+} \
+static void (*DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
+            = NULL; \
+static void (*DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node *node) \
+            = NULL;
+
+/* Macro to define member functions of an engine node which represents
+ * a table of OVN SB DB */
+#define ENGINE_FUNC_SB(TBL_NAME) \
+    ENGINE_FUNC_OVSDB(sb, TBL_NAME, ovnsb_idl)
+
+/* Macro to define member functions of an engine node which represents
+ * a table of open_vswitch DB */
+#define ENGINE_FUNC_OVS(TBL_NAME) \
+    ENGINE_FUNC_OVSDB(ovs, TBL_NAME, ovs_idl)
+
+/* Macro to define an engine node which represents a table of OVN SB DB */
+#define ENGINE_NODE_SB(TBL_NAME, TBL_NAME_STR, CTX) \
+    void *ed_sb_##TBL_NAME; \
+    ENGINE_NODE(sb_##TBL_NAME, TBL_NAME_STR, CTX)
+
+/* Macro to define an engine node which represents a table of open_vswitch
+ * DB */
+#define ENGINE_NODE_OVS(TBL_NAME, TBL_NAME_STR, CTX) \
+    void *ed_ovs_##TBL_NAME; \
+    ENGINE_NODE(ovs_##TBL_NAME, TBL_NAME_STR, CTX)
+
+#endif /* ovn/lib/inc-proc-eng.h */