[ovs-dev,RFC,04/14] ovn-controller: Initial use of incremental engine.

Message ID 1532480380-97578-5-git-send-email-hzhou8@ebay.com
State RFC
Headers show
Series
  • ovn-controller incremental processing.
Related show

Commit Message

Han Zhou July 25, 2018, 12:59 a.m.
Incremental proccessing engine is used to compute flows. In this
patch we create below engine nodes:
    - Engine nodes for each OVSDB table in local OVS DB and SB DB.
    - runtime_data: compute and maintain intermediate result such
                    as local_datapath, etc.
    - flow_output: compute and maintain computed flow table.

In each iteration if there is any input change then everything is
still recomputed, but there is no recompute if there is no change.
For example, pinctrl input will not trigger flow recompute any
more.

Signed-off-by: Han Zhou <hzhou8@ebay.com>
---
 ovn/controller/binding.c        |   1 +
 ovn/controller/ofctrl.c         |  21 +-
 ovn/controller/ofctrl.h         |   5 +-
 ovn/controller/ovn-controller.c | 756 ++++++++++++++++++++++++++++++----------
 4 files changed, 595 insertions(+), 188 deletions(-)

Patch

diff --git a/ovn/controller/binding.c b/ovn/controller/binding.c
index 8d5f13d..a4b30cb 100644
--- a/ovn/controller/binding.c
+++ b/ovn/controller/binding.c
@@ -481,6 +481,7 @@  consider_local_datapath(struct ovsdb_idl_txn *ovnsb_idl_txn,
         update_local_lport_ids(local_lport_ids, binding_rec);
     }
 
+    ovs_assert(ovnsb_idl_txn);
     if (ovnsb_idl_txn) {
         const char *vif_chassis = smap_get(&binding_rec->options,
                                            "requested-chassis");
diff --git a/ovn/controller/ofctrl.c b/ovn/controller/ofctrl.c
index 349de3a..134f0e5 100644
--- a/ovn/controller/ofctrl.c
+++ b/ovn/controller/ofctrl.c
@@ -477,11 +477,21 @@  recv_S_UPDATE_FLOWS(const struct ofp_header *oh, enum ofptype type,
     }
 }
 
+
+enum mf_field_id
+ofctrl_get_mf_field_id(void)
+{
+    if (!rconn_is_connected(swconn)) {
+        return 0;
+    }
+    return (state == S_CLEAR_FLOWS || state == S_UPDATE_FLOWS
+            ? mff_ovn_geneve : 0);
+}
+
 /* Runs the OpenFlow state machine against 'br_int', which is local to the
  * hypervisor on which we are running.  Attempts to negotiate a Geneve option
- * field for class OVN_GENEVE_CLASS, type OVN_GENEVE_TYPE.  If successful,
- * returns the MFF_* field ID for the option, otherwise returns 0. */
-enum mf_field_id
+ * field for class OVN_GENEVE_CLASS, type OVN_GENEVE_TYPE. */
+void
 ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
 {
     char *target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int->name);
@@ -494,7 +504,7 @@  ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
     rconn_run(swconn);
 
     if (!rconn_is_connected(swconn)) {
-        return 0;
+        return;
     }
     if (seqno != rconn_get_connection_seqno(swconn)) {
         seqno = rconn_get_connection_seqno(swconn);
@@ -557,9 +567,6 @@  ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
          * point, so ensure that we come back again without waiting. */
         poll_immediate_wake();
     }
-
-    return (state == S_CLEAR_FLOWS || state == S_UPDATE_FLOWS
-            ? mff_ovn_geneve : 0);
 }
 
 void
diff --git a/ovn/controller/ofctrl.h b/ovn/controller/ofctrl.h
index 886b9bd..9346f5c 100644
--- a/ovn/controller/ofctrl.h
+++ b/ovn/controller/ofctrl.h
@@ -32,8 +32,9 @@  struct shash;
 /* Interface for OVN main loop. */
 void ofctrl_init(struct ovn_extend_table *group_table,
                  struct ovn_extend_table *meter_table);
-enum mf_field_id ofctrl_run(const struct ovsrec_bridge *br_int,
-                            struct shash *pending_ct_zones);
+void ofctrl_run(const struct ovsrec_bridge *br_int,
+                struct shash *pending_ct_zones);
+enum mf_field_id ofctrl_get_mf_field_id(void);
 bool ofctrl_can_put(void);
 void ofctrl_put(struct hmap *flow_table, struct shash *pending_ct_zones,
                 int64_t nb_cfg);
diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c
index 9243466..0a3a803 100644
--- a/ovn/controller/ovn-controller.c
+++ b/ovn/controller/ovn-controller.c
@@ -60,6 +60,7 @@ 
 #include "timeval.h"
 #include "timer.h"
 #include "stopwatch.h"
+#include "ovn/lib/inc-proc-eng.h"
 
 VLOG_DEFINE_THIS_MODULE(main);
 
@@ -195,15 +196,27 @@  update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
     ovsdb_idl_condition_destroy(&dns);
 }
 
+static const char *
+br_int_name(const struct ovsrec_open_vswitch *cfg)
+{
+    return smap_get_def(&cfg->external_ids, "ovn-bridge", DEFAULT_BRIDGE_NAME);
+}
+
 static const struct ovsrec_bridge *
 create_br_int(struct ovsdb_idl_txn *ovs_idl_txn,
-              const struct ovsrec_open_vswitch *cfg,
-              const char *bridge_name)
+              const struct ovsrec_open_vswitch_table *ovs_table)
 {
     if (!ovs_idl_txn) {
         return NULL;
     }
 
+    const struct ovsrec_open_vswitch *cfg;
+    cfg = ovsrec_open_vswitch_table_first(ovs_table);
+    if (!cfg) {
+        return NULL;
+    }
+    const char *bridge_name = br_int_name(cfg);
+
     ovsdb_idl_txn_add_comment(ovs_idl_txn,
             "ovn-controller: creating integration bridge '%s'", bridge_name);
 
@@ -238,8 +251,7 @@  create_br_int(struct ovsdb_idl_txn *ovs_idl_txn,
 }
 
 static const struct ovsrec_bridge *
-get_br_int(struct ovsdb_idl_txn *ovs_idl_txn,
-           const struct ovsrec_bridge_table *bridge_table,
+get_br_int(const struct ovsrec_bridge_table *bridge_table,
            const struct ovsrec_open_vswitch_table *ovs_table)
 {
     const struct ovsrec_open_vswitch *cfg;
@@ -248,14 +260,7 @@  get_br_int(struct ovsdb_idl_txn *ovs_idl_txn,
         return NULL;
     }
 
-    const char *br_int_name = smap_get_def(&cfg->external_ids, "ovn-bridge",
-                                           DEFAULT_BRIDGE_NAME);
-
-    const struct ovsrec_bridge *br = get_bridge(bridge_table, br_int_name);
-    if (!br) {
-        return create_br_int(ovs_idl_txn, cfg, br_int_name);
-    }
-    return br;
+    return get_bridge(bridge_table, br_int_name(cfg));
 }
 
 static const char *
@@ -468,11 +473,8 @@  restore_ct_zones(const struct ovsrec_bridge_table *bridge_table,
         return;
     }
 
-    const char *br_int_name = smap_get_def(&cfg->external_ids, "ovn-bridge",
-                                           DEFAULT_BRIDGE_NAME);
-
     const struct ovsrec_bridge *br_int;
-    br_int = get_bridge(bridge_table, br_int_name);
+    br_int = get_bridge(bridge_table, br_int_name(cfg));
     if (!br_int) {
         /* If the integration bridge hasn't been defined, assume that
          * any existing ct-zone definitions aren't valid. */
@@ -541,6 +543,389 @@  ctrl_register_ovs_idl(struct ovsdb_idl *ovs_idl)
     physical_register_ovs_idl(ovs_idl);
 }
 
+#define SB_NODES \
+    SB_NODE(chassis, "chassis") \
+    SB_NODE(encap, "encap") \
+    SB_NODE(address_set, "address_set") \
+    SB_NODE(port_group, "port_group") \
+    SB_NODE(multicast_group, "multicast_group") \
+    SB_NODE(datapath_binding, "datapath_binding") \
+    SB_NODE(port_binding, "port_binding") \
+    SB_NODE(mac_binding, "mac_binding") \
+    SB_NODE(logical_flow, "logical_flow") \
+    SB_NODE(dhcp_options, "dhcp_options") \
+    SB_NODE(dhcpv6_options, "dhcpv6_options") \
+    SB_NODE(dns, "dns") \
+    SB_NODE(gateway_chassis, "gateway_chassis")
+
+enum sb_engine_node {
+#define SB_NODE(NAME, NAME_STR) SB_##NAME,
+    SB_NODES
+#undef SB_NODE
+};
+
+const char *sb_engine_node_names[] = {
+#define SB_NODE(NAME, NAME_STR) "SB_"NAME_STR,
+    SB_NODES
+#undef SB_NODE
+};
+
+#define SB_NODE_NAME(NAME) sb_engine_node_names[SB_##NAME]
+
+#define SB_NODE(NAME, NAME_STR) ENGINE_FUNC_SB(NAME);
+    SB_NODES
+#undef SB_NODE
+
+#define OVS_NODES \
+    OVS_NODE(open_vswitch, "open_vswitch") \
+    OVS_NODE(bridge, "bridge") \
+    OVS_NODE(port, "port") \
+    OVS_NODE(qos, "qos") \
+    OVS_NODE(interface, "interface")
+
+enum ovs_engine_node {
+#define OVS_NODE(NAME, NAME_STR) OVS_##NAME,
+    OVS_NODES
+#undef OVS_NODE
+};
+
+const char *ovs_engine_node_names[] = {
+#define OVS_NODE(NAME, NAME_STR) "OVS_"NAME_STR,
+    OVS_NODES
+#undef OVS_NODE
+};
+
+#define OVS_NODE_NAME(NAME) ovs_engine_node_names[OVS_##NAME]
+
+#define OVS_NODE(NAME, NAME_STR) ENGINE_FUNC_OVS(NAME);
+    OVS_NODES
+#undef OVS_NODE
+
+
+struct ed_type_runtime_data {
+    /* Contains "struct local_datapath" nodes. */
+    struct hmap local_datapaths;
+
+    /* Contains the name of each logical port resident on the local
+     * hypervisor.  These logical ports include the VIFs (and their child
+     * logical ports, if any) that belong to VMs running on the hypervisor,
+     * l2gateway ports for which options:l2gateway-chassis designates the
+     * local hypervisor, and localnet ports. */
+    struct sset local_lports;
+
+    /* Contains the same ports as local_lports, but in the format:
+     * <datapath-tunnel-key>_<port-tunnel-key> */
+    struct sset local_lport_ids;
+    struct sset active_tunnels;
+    struct shash addr_sets;
+    struct shash port_groups;
+
+    /* connection tracking zones. */
+    unsigned long ct_zone_bitmap[BITMAP_N_LONGS(MAX_CT_ZONES)];
+    struct shash pending_ct_zones;
+    struct simap ct_zones;
+};
+
+static void
+en_runtime_data_init(struct engine_node *node)
+{
+    struct ed_type_runtime_data *data =
+        (struct ed_type_runtime_data *)node->data;
+    struct ovsrec_open_vswitch_table *ovs_table =
+        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_open_vswitch", node));
+    struct ovsrec_bridge_table *bridge_table =
+        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_bridge", node));
+    hmap_init(&data->local_datapaths);
+    sset_init(&data->local_lports);
+    sset_init(&data->local_lport_ids);
+    sset_init(&data->active_tunnels);
+    shash_init(&data->addr_sets);
+    shash_init(&data->port_groups);
+    shash_init(&data->pending_ct_zones);
+    simap_init(&data->ct_zones);
+
+    /* Initialize connection tracking zones. */
+    memset(data->ct_zone_bitmap, 0, sizeof data->ct_zone_bitmap);
+    bitmap_set1(data->ct_zone_bitmap, 0); /* Zone 0 is reserved. */
+    restore_ct_zones(bridge_table, ovs_table,
+                     &data->ct_zones, data->ct_zone_bitmap);
+}
+
+static void
+en_runtime_data_cleanup(struct engine_node *node)
+{
+    struct ed_type_runtime_data *data =
+        (struct ed_type_runtime_data *)node->data;
+
+    expr_const_sets_destroy(&data->addr_sets);
+    shash_destroy(&data->addr_sets);
+    expr_const_sets_destroy(&data->port_groups);
+    shash_destroy(&data->port_groups);
+
+    sset_destroy(&data->local_lports);
+    sset_destroy(&data->local_lport_ids);
+    sset_destroy(&data->active_tunnels);
+    struct local_datapath *cur_node, *next_node;
+    HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node,
+                        &data->local_datapaths) {
+        free(cur_node->peer_dps);
+        hmap_remove(&data->local_datapaths, &cur_node->hmap_node);
+        free(cur_node);
+    }
+    hmap_destroy(&data->local_datapaths);
+
+    simap_destroy(&data->ct_zones);
+    shash_destroy(&data->pending_ct_zones);
+}
+
+static void
+en_runtime_data_run(struct engine_node *node)
+{
+    struct ed_type_runtime_data *data =
+        (struct ed_type_runtime_data *)node->data;
+    struct hmap *local_datapaths = &data->local_datapaths;
+    struct sset *local_lports = &data->local_lports;
+    struct sset *local_lport_ids = &data->local_lport_ids;
+    struct sset *active_tunnels = &data->active_tunnels;
+    struct shash *addr_sets = &data->addr_sets;
+    struct shash *port_groups = &data->port_groups;
+    unsigned long *ct_zone_bitmap = data->ct_zone_bitmap;
+    struct shash *pending_ct_zones = &data->pending_ct_zones;
+    struct simap *ct_zones = &data->ct_zones;
+
+    static bool first_run = true;
+    if (first_run) {
+        /* don't cleanup since there is no data yet */
+        first_run = false;
+    } else {
+        struct local_datapath *cur_node, *next_node;
+        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, local_datapaths) {
+            free(cur_node->peer_dps);
+            hmap_remove(local_datapaths, &cur_node->hmap_node);
+            free(cur_node);
+        }
+        hmap_clear(local_datapaths);
+        sset_destroy(local_lports);
+        sset_destroy(local_lport_ids);
+        sset_destroy(active_tunnels);
+        expr_const_sets_destroy(addr_sets);
+        expr_const_sets_destroy(port_groups);
+        sset_init(local_lports);
+        sset_init(local_lport_ids);
+        sset_init(active_tunnels);
+    }
+
+    struct ovsrec_open_vswitch_table *ovs_table =
+        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_open_vswitch", node));
+    struct ovsrec_bridge_table *bridge_table =
+        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_bridge", node));
+    const char *chassis_id = get_chassis_id(ovs_table);
+    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
+
+    ovs_assert(br_int && chassis_id);
+
+    struct ovsdb_idl_index *sbrec_chassis_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_chassis", node),
+                "name");
+
+    const struct sbrec_chassis *chassis
+        = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
+    ovs_assert(chassis);
+
+    bfd_calculate_active_tunnels(br_int, active_tunnels);
+
+    struct ovsrec_port_table *port_table =
+        (struct ovsrec_port_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_port", node));
+
+    struct ovsrec_qos_table *qos_table =
+        (struct ovsrec_qos_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_qos", node));
+
+    struct sbrec_port_binding_table *pb_table =
+        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
+            engine_get_input("SB_port_binding", node));
+
+    struct ovsdb_idl_index *sbrec_datapath_binding_by_key =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_datapath_binding", node),
+                "key");
+
+    struct ovsdb_idl_index *sbrec_port_binding_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_port_binding", node),
+                "name");
+
+    struct ovsdb_idl_index *sbrec_port_binding_by_datapath =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_port_binding", node),
+                "datapath");
+
+    binding_run(engine_get_context()->ovnsb_idl_txn,
+                engine_get_context()->ovs_idl_txn,
+                sbrec_chassis_by_name,
+                sbrec_datapath_binding_by_key,
+                sbrec_port_binding_by_datapath,
+                sbrec_port_binding_by_name,
+                port_table, qos_table, pb_table,
+                br_int, chassis,
+                active_tunnels, local_datapaths,
+                local_lports, local_lport_ids);
+
+    struct sbrec_address_set_table *as_table =
+        (struct sbrec_address_set_table *)EN_OVSDB_GET(
+            engine_get_input("SB_address_set", node));
+    addr_sets_init(as_table, addr_sets);
+
+    struct sbrec_port_group_table *pg_table =
+        (struct sbrec_port_group_table *)EN_OVSDB_GET(
+            engine_get_input("SB_port_group", node));
+    port_groups_init(pg_table, port_groups);
+
+    update_ct_zones(local_lports, local_datapaths, ct_zones,
+                    ct_zone_bitmap, pending_ct_zones);
+
+    node->changed = true;
+}
+
+struct ed_type_flow_output {
+    /* desired flows */
+    struct hmap flow_table;
+    /* group ids for load balancing */
+    struct ovn_extend_table group_table;
+    /* meter ids for QoS */
+    struct ovn_extend_table meter_table;
+};
+
+static void
+en_flow_output_init(struct engine_node *node)
+{
+    struct ed_type_flow_output *data =
+        (struct ed_type_flow_output *)node->data;
+    hmap_init(&data->flow_table);
+    ovn_extend_table_init(&data->group_table);
+    ovn_extend_table_init(&data->meter_table);
+}
+
+static void
+en_flow_output_cleanup(struct engine_node *node)
+{
+    struct ed_type_flow_output *data =
+        (struct ed_type_flow_output *)node->data;
+    hmap_destroy(&data->flow_table);
+    ovn_extend_table_destroy(&data->group_table);
+    ovn_extend_table_destroy(&data->meter_table);
+}
+
+static void
+en_flow_output_run(struct engine_node *node)
+{
+    struct ed_type_runtime_data *rt_data =
+        (struct ed_type_runtime_data *)engine_get_input(
+            "runtime_data", node)->data;
+    struct hmap *local_datapaths = &rt_data->local_datapaths;
+    struct sset *local_lports = &rt_data->local_lports;
+    struct sset *local_lport_ids = &rt_data->local_lport_ids;
+    struct sset *active_tunnels = &rt_data->active_tunnels;
+    struct shash *addr_sets = &rt_data->addr_sets;
+    struct shash *port_groups = &rt_data->port_groups;
+    struct simap *ct_zones = &rt_data->ct_zones;
+
+    struct ovsrec_open_vswitch_table *ovs_table =
+        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_open_vswitch", node));
+    struct ovsrec_bridge_table *bridge_table =
+        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_bridge", node));
+    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
+    const char *chassis_id = get_chassis_id(ovs_table);
+
+    struct ovsdb_idl_index *sbrec_chassis_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_chassis", node),
+                "name");
+    const struct sbrec_chassis *chassis
+        = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
+
+    ovs_assert(br_int && chassis);
+
+    struct ed_type_flow_output *fo =
+        (struct ed_type_flow_output *)node->data;
+    struct hmap *flow_table = &fo->flow_table;
+    struct ovn_extend_table *group_table = &fo->group_table;
+    struct ovn_extend_table *meter_table = &fo->meter_table;
+
+    static bool first_run = true;
+    if (first_run) {
+        first_run = false;
+    } else {
+        hmap_clear(flow_table);
+    }
+
+    struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_multicast_group", node),
+                "name_datapath");
+
+    struct ovsdb_idl_index *sbrec_port_binding_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_port_binding", node),
+                "name");
+
+    struct sbrec_dhcp_options_table *dhcp_table =
+        (struct sbrec_dhcp_options_table *)EN_OVSDB_GET(
+            engine_get_input("SB_dhcp_options", node));
+
+    struct sbrec_dhcpv6_options_table *dhcpv6_table =
+        (struct sbrec_dhcpv6_options_table *)EN_OVSDB_GET(
+            engine_get_input("SB_dhcpv6_options", node));
+
+    struct sbrec_logical_flow_table *logical_flow_table =
+        (struct sbrec_logical_flow_table *)EN_OVSDB_GET(
+            engine_get_input("SB_logical_flow", node));
+
+    struct sbrec_mac_binding_table *mac_binding_table =
+        (struct sbrec_mac_binding_table *)EN_OVSDB_GET(
+            engine_get_input("SB_mac_binding", node));
+
+    lflow_run(sbrec_chassis_by_name,
+              sbrec_multicast_group_by_name_datapath,
+              sbrec_port_binding_by_name,
+              dhcp_table, dhcpv6_table,
+              logical_flow_table,
+              mac_binding_table,
+              chassis, local_datapaths, addr_sets,
+              port_groups, active_tunnels, local_lport_ids,
+              flow_table, group_table, meter_table);
+
+    enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
+
+    struct sbrec_multicast_group_table *multicast_group_table =
+        (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
+            engine_get_input("SB_multicast_group", node));
+
+    struct sbrec_port_binding_table *port_binding_table =
+        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
+            engine_get_input("SB_port_binding", node));
+
+    physical_run(
+        sbrec_chassis_by_name,
+        sbrec_port_binding_by_name,
+        multicast_group_table,
+        port_binding_table,
+        mff_ovn_geneve,
+        br_int, chassis, ct_zones,
+        local_datapaths, local_lports,
+        active_tunnels,
+        flow_table);
+    node->changed = true;
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -562,17 +947,9 @@  main(int argc, char *argv[])
     }
     unixctl_command_register("exit", "", 0, 0, ovn_controller_exit, &exiting);
 
-    /* Initialize group ids for loadbalancing. */
-    struct ovn_extend_table group_table;
-    ovn_extend_table_init(&group_table);
-
-    /* Initialize meter ids for QoS. */
-    struct ovn_extend_table meter_table;
-    ovn_extend_table_init(&meter_table);
 
     daemonize_complete();
 
-    ofctrl_init(&group_table, &meter_table);
     pinctrl_init();
     lflow_init();
 
@@ -613,26 +990,79 @@  main(int argc, char *argv[])
     update_sb_monitors(ovnsb_idl_loop.idl, NULL, NULL, NULL);
     ovsdb_idl_get_initial_snapshot(ovnsb_idl_loop.idl);
 
-    /* Initialize connection tracking zones. */
-    struct simap ct_zones = SIMAP_INITIALIZER(&ct_zones);
-    struct shash pending_ct_zones = SHASH_INITIALIZER(&pending_ct_zones);
-    unsigned long ct_zone_bitmap[BITMAP_N_LONGS(MAX_CT_ZONES)];
-    memset(ct_zone_bitmap, 0, sizeof ct_zone_bitmap);
-    bitmap_set1(ct_zone_bitmap, 0); /* Zone 0 is reserved. */
-    restore_ct_zones(ovsrec_bridge_table_get(ovs_idl_loop.idl),
-                     ovsrec_open_vswitch_table_get(ovs_idl_loop.idl),
-                     &ct_zones, ct_zone_bitmap);
+    stopwatch_create(CONTROLLER_LOOP_STOPWATCH_NAME, SW_MS);
+
+    struct ed_type_runtime_data ed_runtime_data;
+    struct ed_type_flow_output ed_flow_output;
+
+#define SB_NODE(NAME, NAME_STR) ENGINE_NODE_SB(NAME, NAME_STR);
+    SB_NODES
+#undef SB_NODE
+
+#define OVS_NODE(NAME, NAME_STR) ENGINE_NODE_OVS(NAME, NAME_STR);
+    OVS_NODES
+#undef OVS_NODE
+
+    engine_ovsdb_node_add_index(&en_sb_chassis, "name", sbrec_chassis_by_name);
+    engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath", sbrec_multicast_group_by_name_datapath);
+    engine_ovsdb_node_add_index(&en_sb_port_binding, "name", sbrec_port_binding_by_name);
+    engine_ovsdb_node_add_index(&en_sb_port_binding, "key", sbrec_port_binding_by_key);
+    engine_ovsdb_node_add_index(&en_sb_port_binding, "datapath", sbrec_port_binding_by_datapath);
+    engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key", sbrec_datapath_binding_by_key);
+
+    ENGINE_NODE(runtime_data, "runtime_data");
+    ENGINE_NODE(flow_output, "flow_output");
+    engine_add_input(&en_flow_output, &en_runtime_data, NULL);
+
+    engine_add_input(&en_flow_output, &en_ovs_open_vswitch, NULL);
+    engine_add_input(&en_flow_output, &en_ovs_bridge, NULL);
+    engine_add_input(&en_flow_output, &en_ovs_port, NULL);
+    engine_add_input(&en_flow_output, &en_ovs_interface, NULL);
+
+    engine_add_input(&en_flow_output, &en_sb_chassis, NULL);
+    engine_add_input(&en_flow_output, &en_sb_encap, NULL);
+    engine_add_input(&en_flow_output, &en_sb_address_set, NULL);
+    engine_add_input(&en_flow_output, &en_sb_port_group, NULL);
+    engine_add_input(&en_flow_output, &en_sb_multicast_group, NULL);
+    engine_add_input(&en_flow_output, &en_sb_datapath_binding, NULL);
+    engine_add_input(&en_flow_output, &en_sb_port_binding, NULL);
+    engine_add_input(&en_flow_output, &en_sb_mac_binding, NULL);
+    engine_add_input(&en_flow_output, &en_sb_logical_flow, NULL);
+    engine_add_input(&en_flow_output, &en_sb_dhcp_options, NULL);
+    engine_add_input(&en_flow_output, &en_sb_dhcpv6_options, NULL);
+    engine_add_input(&en_flow_output, &en_sb_dns, NULL);
+    engine_add_input(&en_flow_output, &en_sb_gateway_chassis, NULL);
+
+    engine_add_input(&en_runtime_data, &en_ovs_open_vswitch, NULL);
+    engine_add_input(&en_runtime_data, &en_ovs_bridge, NULL);
+    engine_add_input(&en_runtime_data, &en_ovs_port, NULL);
+    engine_add_input(&en_runtime_data, &en_ovs_qos, NULL);
+    engine_add_input(&en_runtime_data, &en_ovs_interface, NULL);
+
+    engine_add_input(&en_runtime_data, &en_sb_chassis, NULL);
+    engine_add_input(&en_runtime_data, &en_sb_address_set, NULL);
+    engine_add_input(&en_runtime_data, &en_sb_port_group, NULL);
+    engine_add_input(&en_runtime_data, &en_sb_datapath_binding, NULL);
+    engine_add_input(&en_runtime_data, &en_sb_port_binding, NULL);
+    engine_add_input(&en_runtime_data, &en_sb_gateway_chassis, NULL);
+
+    engine_init(&en_flow_output);
+
+    ofctrl_init(&ed_flow_output.group_table,
+                &ed_flow_output.meter_table);
     unixctl_command_register("ct-zone-list", "", 0, 0,
-                             ct_zone_list, &ct_zones);
+                             ct_zone_list, &ed_runtime_data.ct_zones);
 
     struct pending_pkt pending_pkt = { .conn = NULL };
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
-    stopwatch_create(CONTROLLER_LOOP_STOPWATCH_NAME, SW_MS);
+    uint64_t engine_run_id = 0;
+    uint64_t old_engine_run_id = 0;
     /* Main loop. */
     exiting = false;
     while (!exiting) {
+        old_engine_run_id = engine_run_id;
         /* Check OVN SB database. */
         char *new_ovnsb_remote = get_ovnsb_remote(ovs_idl_loop.idl);
         if (strcmp(ovnsb_remote, new_ovnsb_remote)) {
@@ -647,59 +1077,54 @@  main(int argc, char *argv[])
         struct ovsdb_idl_txn *ovnsb_idl_txn
             = ovsdb_idl_loop_run(&ovnsb_idl_loop);
 
+        struct engine_context eng_ctx = {
+            .ovs_idl_txn = ovs_idl_txn,
+            .ovnsb_idl_txn = ovnsb_idl_txn
+        };
+
+        engine_set_context(&eng_ctx);
+
         update_probe_interval(ovsrec_open_vswitch_table_get(ovs_idl_loop.idl),
                               ovnsb_remote, ovnsb_idl_loop.idl);
 
         update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
 
-        /* Contains "struct local_datapath" nodes. */
-        struct hmap local_datapaths = HMAP_INITIALIZER(&local_datapaths);
-
-        /* Contains the name of each logical port resident on the local
-         * hypervisor.  These logical ports include the VIFs (and their child
-         * logical ports, if any) that belong to VMs running on the hypervisor,
-         * l2gateway ports for which options:l2gateway-chassis designates the
-         * local hypervisor, and localnet ports. */
-        struct sset local_lports = SSET_INITIALIZER(&local_lports);
-        /* Contains the same ports as local_lports, but in the format:
-         * <datapath-tunnel-key>_<port-tunnel-key> */
-        struct sset local_lport_ids = SSET_INITIALIZER(&local_lport_ids);
-        struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
-
-        const struct ovsrec_bridge *br_int
-            = get_br_int(ovs_idl_txn,
-                         ovsrec_bridge_table_get(ovs_idl_loop.idl),
-                         ovsrec_open_vswitch_table_get(ovs_idl_loop.idl));
-        const char *chassis_id
-            = get_chassis_id(ovsrec_open_vswitch_table_get(ovs_idl_loop.idl));
-
+        const struct ovsrec_bridge_table *bridge_table =
+            ovsrec_bridge_table_get(ovs_idl_loop.idl);
+        const struct ovsrec_open_vswitch_table *ovs_table =
+            ovsrec_open_vswitch_table_get(ovs_idl_loop.idl);
+        const struct ovsrec_bridge *br_int = get_br_int(bridge_table,
+                                                        ovs_table);
+        if (!br_int) {
+            br_int = create_br_int(ovs_idl_txn, ovs_table);
+        }
+        const char *chassis_id = get_chassis_id(ovs_table);
         const struct sbrec_chassis *chassis = NULL;
         if (chassis_id) {
-            chassis = chassis_run(ovnsb_idl_txn, sbrec_chassis_by_name,
-                                  ovsrec_open_vswitch_table_get(ovs_idl_loop.idl),
-                                  chassis_id, br_int);
-            encaps_run(ovs_idl_txn,
-                       ovsrec_bridge_table_get(ovs_idl_loop.idl), br_int,
-                       sbrec_chassis_table_get(ovnsb_idl_loop.idl), chassis_id);
-            bfd_calculate_active_tunnels(br_int, &active_tunnels);
-            binding_run(ovnsb_idl_txn, ovs_idl_txn, sbrec_chassis_by_name,
-                        sbrec_datapath_binding_by_key,
-                        sbrec_port_binding_by_datapath,
-                        sbrec_port_binding_by_name,
-                        ovsrec_port_table_get(ovs_idl_loop.idl),
-                        ovsrec_qos_table_get(ovs_idl_loop.idl),
-                        sbrec_port_binding_table_get(ovnsb_idl_loop.idl),
-                        br_int, chassis,
-                        &active_tunnels, &local_datapaths,
-                        &local_lports, &local_lport_ids);
+            const struct sbrec_chassis *chassis_ = chassis_run(
+                    ovnsb_idl_txn, sbrec_chassis_by_name, ovs_table,
+                    chassis_id, br_int);
+
+            /* XXX: Query again using index instead of using return value
+             * directly from chassis_run(), because currently index
+             * implementation is not able to get data that is inserted in
+             * current transaction. We don't want to continue processing
+             * in current main loop if chassis can't be found with index
+             * since we are relying on this in the engine node processing.
+             */
+            if (chassis_) {
+                chassis = chassis_lookup_by_name(sbrec_chassis_by_name,
+                                                 chassis_id);
+                if (!chassis) {
+                    VLOG_DBG("Chassis created but not found by index. run-id: "
+                              "%"PRIu64, engine_run_id);
+                    poll_immediate_wake();
+                }
+            }
         }
+
         if (br_int && chassis) {
-            struct shash addr_sets = SHASH_INITIALIZER(&addr_sets);
-            addr_sets_init(sbrec_address_set_table_get(ovnsb_idl_loop.idl),
-                           &addr_sets);
-            struct shash port_groups = SHASH_INITIALIZER(&port_groups);
-            port_groups_init(sbrec_port_group_table_get(ovnsb_idl_loop.idl),
-                             &port_groups);
+            ofctrl_run(br_int, &ed_runtime_data.pending_ct_zones);
 
             patch_run(ovs_idl_txn,
                       ovsrec_bridge_table_get(ovs_idl_loop.idl),
@@ -707,9 +1132,33 @@  main(int argc, char *argv[])
                       ovsrec_port_table_get(ovs_idl_loop.idl),
                       sbrec_port_binding_table_get(ovnsb_idl_loop.idl),
                       br_int, chassis);
-
-            enum mf_field_id mff_ovn_geneve = ofctrl_run(br_int,
-                                                         &pending_ct_zones);
+            encaps_run(ovs_idl_txn,
+                       bridge_table, br_int,
+                       sbrec_chassis_table_get(ovnsb_idl_loop.idl),
+                       chassis_id);
+
+            if (ofctrl_can_put()) {
+                stopwatch_start(CONTROLLER_LOOP_STOPWATCH_NAME,
+                                time_msec());
+                engine_run(&en_flow_output, ++engine_run_id);
+                stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
+                               time_msec());
+
+                if (ovs_idl_txn) {
+                    commit_ct_zones(br_int, &ed_runtime_data.pending_ct_zones);
+                    bfd_run(sbrec_chassis_by_name,
+                            sbrec_port_binding_by_datapath,
+                            ovsrec_interface_table_get(ovs_idl_loop.idl),
+                            br_int, chassis,
+                            &ed_runtime_data.local_datapaths);
+                }
+                if (en_flow_output.changed) {
+                    ofctrl_put(&ed_flow_output.flow_table,
+                               &ed_runtime_data.pending_ct_zones,
+                               get_nb_cfg(sbrec_sb_global_table_get(
+                                              ovnsb_idl_loop.idl)));
+                }
+            }
 
             pinctrl_run(ovnsb_idl_txn, sbrec_chassis_by_name,
                         sbrec_datapath_binding_by_key,
@@ -719,106 +1168,58 @@  main(int argc, char *argv[])
                         sbrec_dns_table_get(ovnsb_idl_loop.idl),
                         sbrec_mac_binding_table_get(ovnsb_idl_loop.idl),
                         br_int, chassis,
-                        &local_datapaths, &active_tunnels);
-            update_ct_zones(&local_lports, &local_datapaths, &ct_zones,
-                            ct_zone_bitmap, &pending_ct_zones);
-            if (ovs_idl_txn) {
-                if (ofctrl_can_put()) {
-                    stopwatch_start(CONTROLLER_LOOP_STOPWATCH_NAME,
-                                    time_msec());
-
-                    commit_ct_zones(br_int, &pending_ct_zones);
-
-                    struct hmap flow_table = HMAP_INITIALIZER(&flow_table);
-                    lflow_run(sbrec_chassis_by_name,
-                              sbrec_multicast_group_by_name_datapath,
-                              sbrec_port_binding_by_name,
-                              sbrec_dhcp_options_table_get(ovnsb_idl_loop.idl),
-                              sbrec_dhcpv6_options_table_get(ovnsb_idl_loop.idl),
-                              sbrec_logical_flow_table_get(ovnsb_idl_loop.idl),
-                              sbrec_mac_binding_table_get(ovnsb_idl_loop.idl),
-                              chassis,
-                              &local_datapaths, &addr_sets,
-                              &port_groups, &active_tunnels, &local_lport_ids,
-                              &flow_table, &group_table, &meter_table);
-
-                    if (chassis_id) {
-                        bfd_run(sbrec_chassis_by_name,
-                                sbrec_port_binding_by_datapath,
-                                ovsrec_interface_table_get(ovs_idl_loop.idl),
-                                br_int, chassis, &local_datapaths);
-                    }
-                    physical_run(
-                        sbrec_chassis_by_name,
-                        sbrec_port_binding_by_name,
-                        sbrec_multicast_group_table_get(ovnsb_idl_loop.idl),
-                        sbrec_port_binding_table_get(ovnsb_idl_loop.idl),
-                        mff_ovn_geneve,
-                        br_int, chassis, &ct_zones,
-                        &local_datapaths, &local_lports,
-                        &active_tunnels,
-                        &flow_table);
-
-                    stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
-                                   time_msec());
-
-                    ofctrl_put(&flow_table, &pending_ct_zones,
-                               get_nb_cfg(sbrec_sb_global_table_get(
-                                              ovnsb_idl_loop.idl)));
+                        &ed_runtime_data.local_datapaths,
+                        &ed_runtime_data.active_tunnels);
 
-                    hmap_destroy(&flow_table);
-                }
-                if (ovnsb_idl_txn) {
-                    int64_t cur_cfg = ofctrl_get_cur_cfg();
-                    if (cur_cfg && cur_cfg != chassis->nb_cfg) {
-                        sbrec_chassis_set_nb_cfg(chassis, cur_cfg);
-                    }
-                }
+            update_sb_monitors(ovnsb_idl_loop.idl, chassis,
+                               &ed_runtime_data.local_lports,
+                               &ed_runtime_data.local_datapaths);
+
+        }
+        if (old_engine_run_id == engine_run_id) {
+            if (engine_need_run(&en_flow_output)) {
+                VLOG_DBG("engine did not run, force recompute next time: "
+                         "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else {
+                VLOG_DBG("engine did not run, and it was not needed either: "
+                         "br_int %p, chassis %p", br_int, chassis);
+            }
+        } else {
+            engine_set_force_recompute(false);
+        }
+
+        if (ovnsb_idl_txn && chassis) {
+            int64_t cur_cfg = ofctrl_get_cur_cfg();
+            if (cur_cfg && cur_cfg != chassis->nb_cfg) {
+                sbrec_chassis_set_nb_cfg(chassis, cur_cfg);
             }
+        }
+
 
-            if (pending_pkt.conn) {
+        if (pending_pkt.conn) {
+            if (br_int && chassis) {
                 char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
-                                                &port_groups, &addr_sets);
+                                                &ed_runtime_data.port_groups,
+                                                &ed_runtime_data.addr_sets);
                 if (error) {
                     unixctl_command_reply_error(pending_pkt.conn, error);
                     free(error);
                 } else {
                     unixctl_command_reply(pending_pkt.conn, NULL);
                 }
-                pending_pkt.conn = NULL;
-                free(pending_pkt.flow_s);
+            } else {
+                VLOG_DBG("Pending_pkt conn but br_int %p or chassis %p not"
+                          " ready. run-id: %"PRIu64, br_int, chassis,
+                          engine_run_id);
+                unixctl_command_reply_error(pending_pkt.conn,
+                                            "ovn-controller not ready.");
             }
-
-            update_sb_monitors(ovnsb_idl_loop.idl, chassis,
-                               &local_lports, &local_datapaths);
-
-            expr_const_sets_destroy(&addr_sets);
-            shash_destroy(&addr_sets);
-            expr_const_sets_destroy(&port_groups);
-            shash_destroy(&port_groups);
-        }
-
-        /* If we haven't handled the pending packet insertion
-         * request, the system is not ready. */
-        if (pending_pkt.conn) {
-            unixctl_command_reply_error(pending_pkt.conn,
-                                        "ovn-controller not ready.");
             pending_pkt.conn = NULL;
             free(pending_pkt.flow_s);
         }
 
-        sset_destroy(&local_lports);
-        sset_destroy(&local_lport_ids);
-        sset_destroy(&active_tunnels);
-
-        struct local_datapath *cur_node, *next_node;
-        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, &local_datapaths) {
-            free(cur_node->peer_dps);
-            hmap_remove(&local_datapaths, &cur_node->hmap_node);
-            free(cur_node);
-        }
-        hmap_destroy(&local_datapaths);
-
         unixctl_server_run(unixctl);
 
         unixctl_server_wait(unixctl);
@@ -835,23 +1236,27 @@  main(int argc, char *argv[])
 
         if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
             struct shash_node *iter, *iter_next;
-            SHASH_FOR_EACH_SAFE(iter, iter_next, &pending_ct_zones) {
+            SHASH_FOR_EACH_SAFE (iter, iter_next,
+                                 &ed_runtime_data.pending_ct_zones) {
                 struct ct_zone_pending_entry *ctzpe = iter->data;
                 if (ctzpe->state == CT_ZONE_DB_SENT) {
-                    shash_delete(&pending_ct_zones, iter);
+                    shash_delete(&ed_runtime_data.pending_ct_zones, iter);
                     free(ctzpe);
                 }
             }
         }
 
-        ovsdb_idl_track_clear(ctx.ovnsb_idl);
-        ovsdb_idl_track_clear(ctx.ovs_idl);
+        ovsdb_idl_track_clear(ovnsb_idl_loop.idl);
+        ovsdb_idl_track_clear(ovs_idl_loop.idl);
         poll_block();
         if (should_service_stop()) {
             exiting = true;
         }
     }
 
+    engine_set_context(NULL);
+    engine_cleanup(&en_flow_output);
+
     /* It's time to exit.  Clean up the databases. */
     bool done = false;
     while (!done) {
@@ -867,8 +1272,7 @@  main(int argc, char *argv[])
         const struct sbrec_port_binding_table *port_binding_table
             = sbrec_port_binding_table_get(ovnsb_idl_loop.idl);
 
-        const struct ovsrec_bridge *br_int = get_br_int(ovs_idl_txn,
-                                                        bridge_table,
+        const struct ovsrec_bridge *br_int = get_br_int(bridge_table,
                                                         ovs_table);
         const char *chassis_id = get_chassis_id(ovs_table);
         const struct sbrec_chassis *chassis
@@ -895,12 +1299,6 @@  main(int argc, char *argv[])
     ofctrl_destroy();
     pinctrl_destroy();
 
-    simap_destroy(&ct_zones);
-    shash_destroy(&pending_ct_zones);
-
-    ovn_extend_table_destroy(&group_table);
-    ovn_extend_table_destroy(&meter_table);
-
     ovsdb_idl_loop_destroy(&ovs_idl_loop);
     ovsdb_idl_loop_destroy(&ovnsb_idl_loop);