[ovs-dev,v3,ovn,4/4] ovn-controller: Fix use of dangling pointers in I-P runtime_data.
diff mbox series

Message ID 20191114170605.30391.18942.stgit@dceara.remote.csb
State Superseded
Headers show
Series
  • [ovs-dev,v3,ovn,1/4] ovn-controller: Refactor I-P engine_run() tracking.
Related show

Commit Message

Dumitru Ceara Nov. 14, 2019, 5:06 p.m. UTC
The incremental processing engine might stop a run before the
en_runtime_data node is processed. In such cases the ed_runtime_data
fields might contain pointers to already deleted SB records. For
example, if a port binding corresponding to a patch port is removed from
the SB database and the incremental processing engine aborts before the
en_runtime_data node is processed then the corresponding local_datapath
hashtable entry in ed_runtime_data is stale and will store a pointer to
the already freed sbrec_port_binding record.

This will cause invalid memory accesses in various places (e.g.,
pinctrl_run() -> prepare_ipv6_ras()).

To fix the issue we introduce the concept of "internal_data" vs "data" in
engine nodes. The first field, "internal_data", is data that can be accessed
by the incremental engine nodes handlers (data from other nodes must be
considered read-only and data from other nodes must not be accessed if the
nodes haven't been refreshed in the current iteration). The second field,
"data" is a pointer reset at engine_run() and if non-NULL indicates to
users outside the incremental engine that the data is safe to use.

This commit also adds an "is_valid()" method to engine nodes to allow
users to override the default behavior of determining if data is valid in a
node (e.g., for the ct-zones node the data is always safe to access).

CC: Han Zhou <hzhou8@ebay.com>
Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - quiet mode.")
Signed-off-by: Dumitru Ceara <dceara@redhat.com>

---
v3:
- split the change in series.
- Address Han's comments:
    - fix the data encapsulation issue.
    - add is_valid method to nodes.
    - add internal_data/data fields to nodes as it makes it easier to write
      the code instead of adding an "engine_get_data()" API.
v2: Address Han's comments:
- call engine_node_valid() in all the places where node local data is
  used.
- move out "global" data outside the engine nodes. Make a clear
  separation between data that can be safely used at any time and data
  that can be used only when the engine run was successful.
- add a debug log for iterations when the engine didn't run.
- refactor a bit more the incremental engine code.
---
 controller/ovn-controller.c |  227 ++++++++++++++++++++++---------------------
 lib/inc-proc-eng.c          |   25 ++++-
 lib/inc-proc-eng.h          |   28 +++++
 3 files changed, 162 insertions(+), 118 deletions(-)

Patch
diff mbox series

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 51f466a..0475a30 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -741,8 +741,7 @@  struct ed_type_ofctrl_is_connected {
 static void
 en_ofctrl_is_connected_init(struct engine_node *node)
 {
-    struct ed_type_ofctrl_is_connected *data =
-        (struct ed_type_ofctrl_is_connected *)node->data;
+    struct ed_type_ofctrl_is_connected *data = node->internal_data;
     data->connected = false;
 }
 
@@ -754,8 +753,7 @@  en_ofctrl_is_connected_cleanup(struct engine_node *node OVS_UNUSED)
 static void
 en_ofctrl_is_connected_run(struct engine_node *node)
 {
-    struct ed_type_ofctrl_is_connected *data =
-        (struct ed_type_ofctrl_is_connected *)node->data;
+    struct ed_type_ofctrl_is_connected *data = node->internal_data;
     if (data->connected != ofctrl_is_connected()) {
         data->connected = !data->connected;
         engine_set_node_state(node, EN_UPDATED);
@@ -775,7 +773,7 @@  struct ed_type_addr_sets {
 static void
 en_addr_sets_init(struct engine_node *node)
 {
-    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
+    struct ed_type_addr_sets *as = node->internal_data;
     shash_init(&as->addr_sets);
     as->change_tracked = false;
     sset_init(&as->new);
@@ -786,7 +784,7 @@  en_addr_sets_init(struct engine_node *node)
 static void
 en_addr_sets_cleanup(struct engine_node *node)
 {
-    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
+    struct ed_type_addr_sets *as = node->internal_data;
     expr_const_sets_destroy(&as->addr_sets);
     shash_destroy(&as->addr_sets);
     sset_destroy(&as->new);
@@ -797,7 +795,7 @@  en_addr_sets_cleanup(struct engine_node *node)
 static void
 en_addr_sets_run(struct engine_node *node)
 {
-    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
+    struct ed_type_addr_sets *as = node->internal_data;
 
     sset_clear(&as->new);
     sset_clear(&as->deleted);
@@ -817,7 +815,7 @@  en_addr_sets_run(struct engine_node *node)
 static bool
 addr_sets_sb_address_set_handler(struct engine_node *node)
 {
-    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
+    struct ed_type_addr_sets *as = node->internal_data;
 
     sset_clear(&as->new);
     sset_clear(&as->deleted);
@@ -852,7 +850,7 @@  struct ed_type_port_groups{
 static void
 en_port_groups_init(struct engine_node *node)
 {
-    struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
+    struct ed_type_port_groups *pg = node->internal_data;
     shash_init(&pg->port_groups);
     pg->change_tracked = false;
     sset_init(&pg->new);
@@ -863,7 +861,7 @@  en_port_groups_init(struct engine_node *node)
 static void
 en_port_groups_cleanup(struct engine_node *node)
 {
-    struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
+    struct ed_type_port_groups *pg = node->internal_data;
     expr_const_sets_destroy(&pg->port_groups);
     shash_destroy(&pg->port_groups);
     sset_destroy(&pg->new);
@@ -874,7 +872,7 @@  en_port_groups_cleanup(struct engine_node *node)
 static void
 en_port_groups_run(struct engine_node *node)
 {
-    struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
+    struct ed_type_port_groups *pg = node->internal_data;
 
     sset_clear(&pg->new);
     sset_clear(&pg->deleted);
@@ -894,7 +892,7 @@  en_port_groups_run(struct engine_node *node)
 static bool
 port_groups_sb_port_group_handler(struct engine_node *node)
 {
-    struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
+    struct ed_type_port_groups *pg = node->internal_data;
 
     sset_clear(&pg->new);
     sset_clear(&pg->deleted);
@@ -938,8 +936,7 @@  struct ed_type_runtime_data {
 static void
 en_runtime_data_init(struct engine_node *node)
 {
-    struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)node->data;
+    struct ed_type_runtime_data *data = node->internal_data;
 
     hmap_init(&data->local_datapaths);
     sset_init(&data->local_lports);
@@ -950,8 +947,7 @@  en_runtime_data_init(struct engine_node *node)
 static void
 en_runtime_data_cleanup(struct engine_node *node)
 {
-    struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)node->data;
+    struct ed_type_runtime_data *data = node->internal_data;
 
     sset_destroy(&data->local_lports);
     sset_destroy(&data->local_lport_ids);
@@ -970,8 +966,7 @@  en_runtime_data_cleanup(struct engine_node *node)
 static void
 en_runtime_data_run(struct engine_node *node)
 {
-    struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)node->data;
+    struct ed_type_runtime_data *data = node->internal_data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *local_lports = &data->local_lports;
     struct sset *local_lport_ids = &data->local_lport_ids;
@@ -1019,8 +1014,7 @@  en_runtime_data_run(struct engine_node *node)
     ovs_assert(chassis);
 
     struct ed_type_ofctrl_is_connected *ed_ofctrl_is_connected =
-        (struct ed_type_ofctrl_is_connected *)engine_get_input(
-            "ofctrl_is_connected", node)->data;
+        engine_get_input("ofctrl_is_connected", node)->internal_data;
     if (ed_ofctrl_is_connected->connected) {
         /* Calculate the active tunnels only if have an an active
          * OpenFlow connection to br-int.
@@ -1076,8 +1070,7 @@  en_runtime_data_run(struct engine_node *node)
 static bool
 runtime_data_sb_port_binding_handler(struct engine_node *node)
 {
-    struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)node->data;
+    struct ed_type_runtime_data *data = node->internal_data;
     struct sset *local_lports = &data->local_lports;
     struct sset *active_tunnels = &data->active_tunnels;
 
@@ -1121,7 +1114,7 @@  struct ed_type_ct_zones {
 static void
 en_ct_zones_init(struct engine_node *node)
 {
-    struct ed_type_ct_zones *data = node->data;
+    struct ed_type_ct_zones *data = node->internal_data;
     struct ovsrec_open_vswitch_table *ovs_table =
         (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
             engine_get_input("OVS_open_vswitch", node));
@@ -1140,7 +1133,7 @@  en_ct_zones_init(struct engine_node *node)
 static void
 en_ct_zones_cleanup(struct engine_node *node)
 {
-    struct ed_type_ct_zones *data = node->data;
+    struct ed_type_ct_zones *data = node->internal_data;
 
     simap_destroy(&data->current);
     shash_destroy(&data->pending);
@@ -1149,10 +1142,9 @@  en_ct_zones_cleanup(struct engine_node *node)
 static void
 en_ct_zones_run(struct engine_node *node)
 {
-    struct ed_type_ct_zones *data = node->data;
+    struct ed_type_ct_zones *data = node->internal_data;
     struct ed_type_runtime_data *rt_data =
-        (struct ed_type_runtime_data *)engine_get_input(
-            "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
 
     update_ct_zones(&rt_data->local_lports, &rt_data->local_datapaths,
                     &data->current, data->bitmap, &data->pending);
@@ -1160,6 +1152,13 @@  en_ct_zones_run(struct engine_node *node)
     engine_set_node_state(node, EN_UPDATED);
 }
 
+/* The data in the ct_zones node is always valid (i.e., no stale pointers). */
+static bool
+en_ct_zones_is_valid(struct engine_node *node OVS_UNUSED)
+{
+    return true;
+}
+
 struct ed_type_mff_ovn_geneve {
     enum mf_field_id mff_ovn_geneve;
 };
@@ -1167,8 +1166,7 @@  struct ed_type_mff_ovn_geneve {
 static void
 en_mff_ovn_geneve_init(struct engine_node *node)
 {
-    struct ed_type_mff_ovn_geneve *data =
-        (struct ed_type_mff_ovn_geneve *)node->data;
+    struct ed_type_mff_ovn_geneve *data = node->internal_data;
     data->mff_ovn_geneve = 0;
 }
 
@@ -1180,8 +1178,7 @@  en_mff_ovn_geneve_cleanup(struct engine_node *node OVS_UNUSED)
 static void
 en_mff_ovn_geneve_run(struct engine_node *node)
 {
-    struct ed_type_mff_ovn_geneve *data =
-        (struct ed_type_mff_ovn_geneve *)node->data;
+    struct ed_type_mff_ovn_geneve *data = node->internal_data;
     enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
     if (data->mff_ovn_geneve != mff_ovn_geneve) {
         data->mff_ovn_geneve = mff_ovn_geneve;
@@ -1207,8 +1204,7 @@  struct ed_type_flow_output {
 static void
 en_flow_output_init(struct engine_node *node)
 {
-    struct ed_type_flow_output *data =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *data = node->internal_data;
     ovn_desired_flow_table_init(&data->flow_table);
     ovn_extend_table_init(&data->group_table);
     ovn_extend_table_init(&data->meter_table);
@@ -1219,8 +1215,7 @@  en_flow_output_init(struct engine_node *node)
 static void
 en_flow_output_cleanup(struct engine_node *node)
 {
-    struct ed_type_flow_output *data =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *data = node->internal_data;
     ovn_desired_flow_table_destroy(&data->flow_table);
     ovn_extend_table_destroy(&data->group_table);
     ovn_extend_table_destroy(&data->meter_table);
@@ -1231,21 +1226,18 @@  static void
 en_flow_output_run(struct engine_node *node)
 {
     struct ed_type_runtime_data *rt_data =
-        (struct ed_type_runtime_data *)engine_get_input(
-            "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
     struct hmap *local_datapaths = &rt_data->local_datapaths;
     struct sset *local_lports = &rt_data->local_lports;
     struct sset *local_lport_ids = &rt_data->local_lport_ids;
     struct sset *active_tunnels = &rt_data->active_tunnels;
 
     struct ed_type_ct_zones *ct_zones_data =
-        (struct ed_type_ct_zones *)engine_get_input(
-            "ct_zones", node)->data;
+        engine_get_input("ct_zones", node)->internal_data;
     struct simap *ct_zones = &ct_zones_data->current;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
-        (struct ed_type_mff_ovn_geneve *)engine_get_input(
-            "mff_ovn_geneve", node)->data;
+        engine_get_input("mff_ovn_geneve", node)->internal_data;
     enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
 
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -1262,12 +1254,11 @@  en_flow_output_run(struct engine_node *node)
                 engine_get_input("SB_chassis", node),
                 "name");
     struct ed_type_addr_sets *as_data =
-        (struct ed_type_addr_sets *)engine_get_input("addr_sets", node)->data;
+        engine_get_input("addr_sets", node)->internal_data;
     struct shash *addr_sets = &as_data->addr_sets;
 
     struct ed_type_port_groups *pg_data =
-        (struct ed_type_port_groups *)engine_get_input(
-            "port_groups", node)->data;
+        engine_get_input("port_groups", node)->internal_data;
     struct shash *port_groups = &pg_data->port_groups;
 
     const struct sbrec_chassis *chassis = NULL;
@@ -1277,8 +1268,7 @@  en_flow_output_run(struct engine_node *node)
 
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *fo = node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
     struct ovn_extend_table *group_table = &fo->group_table;
     struct ovn_extend_table *meter_table = &fo->meter_table;
@@ -1361,18 +1351,16 @@  static bool
 flow_output_sb_logical_flow_handler(struct engine_node *node)
 {
     struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)engine_get_input(
-                "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *local_lport_ids = &data->local_lport_ids;
     struct sset *active_tunnels = &data->active_tunnels;
     struct ed_type_addr_sets *as_data =
-        (struct ed_type_addr_sets *)engine_get_input("addr_sets", node)->data;
+        engine_get_input("addr_sets", node)->internal_data;
     struct shash *addr_sets = &as_data->addr_sets;
 
     struct ed_type_port_groups *pg_data =
-        (struct ed_type_port_groups *)engine_get_input(
-            "port_groups", node)->data;
+        engine_get_input("port_groups", node)->internal_data;
     struct shash *port_groups = &pg_data->port_groups;
 
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -1396,8 +1384,7 @@  flow_output_sb_logical_flow_handler(struct engine_node *node)
 
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *fo = node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
     struct ovn_extend_table *group_table = &fo->group_table;
     struct ovn_extend_table *meter_table = &fo->meter_table;
@@ -1452,8 +1439,7 @@  flow_output_sb_mac_binding_handler(struct engine_node *node)
         (struct sbrec_mac_binding_table *)EN_OVSDB_GET(
             engine_get_input("SB_mac_binding", node));
 
-    struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *fo = node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
 
     lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
@@ -1467,19 +1453,16 @@  static bool
 flow_output_sb_port_binding_handler(struct engine_node *node)
 {
     struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)engine_get_input(
-                "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *active_tunnels = &data->active_tunnels;
 
     struct ed_type_ct_zones *ct_zones_data =
-        (struct ed_type_ct_zones *)engine_get_input(
-            "ct_zones", node)->data;
+        engine_get_input("ct_zones", node)->internal_data;
     struct simap *ct_zones = &ct_zones_data->current;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
-        (struct ed_type_mff_ovn_geneve *)engine_get_input(
-            "mff_ovn_geneve", node)->data;
+        engine_get_input("mff_ovn_geneve", node)->internal_data;
     enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
 
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -1501,8 +1484,7 @@  flow_output_sb_port_binding_handler(struct engine_node *node)
     }
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *fo = node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
 
     struct ovsdb_idl_index *sbrec_port_binding_by_name =
@@ -1575,18 +1557,15 @@  static bool
 flow_output_sb_multicast_group_handler(struct engine_node *node)
 {
     struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)engine_get_input(
-                "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
     struct hmap *local_datapaths = &data->local_datapaths;
 
     struct ed_type_ct_zones *ct_zones_data =
-        (struct ed_type_ct_zones *)engine_get_input(
-            "ct_zones", node)->data;
+        engine_get_input("ct_zones", node)->internal_data;
     struct simap *ct_zones = &ct_zones_data->current;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
-        (struct ed_type_mff_ovn_geneve *)engine_get_input(
-            "mff_ovn_geneve", node)->data;
+        engine_get_input("mff_ovn_geneve", node)->internal_data;
     enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
 
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -1608,8 +1587,7 @@  flow_output_sb_multicast_group_handler(struct engine_node *node)
     }
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+    struct ed_type_flow_output *fo = node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
 
     struct sbrec_multicast_group_table *multicast_group_table =
@@ -1630,19 +1608,17 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                                  enum ref_type ref_type)
 {
     struct ed_type_runtime_data *data =
-        (struct ed_type_runtime_data *)engine_get_input(
-                "runtime_data", node)->data;
+        engine_get_input("runtime_data", node)->internal_data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *local_lport_ids = &data->local_lport_ids;
     struct sset *active_tunnels = &data->active_tunnels;
 
     struct ed_type_addr_sets *as_data =
-        (struct ed_type_addr_sets *)engine_get_input("addr_sets", node)->data;
+        engine_get_input("addr_sets", node)->internal_data;
     struct shash *addr_sets = &as_data->addr_sets;
 
     struct ed_type_port_groups *pg_data =
-        (struct ed_type_port_groups *)engine_get_input(
-            "port_groups", node)->data;
+        engine_get_input("port_groups", node)->internal_data;
     struct shash *port_groups = &pg_data->port_groups;
 
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -1666,7 +1642,7 @@  _flow_output_resource_ref_handler(struct engine_node *node,
     ovs_assert(br_int && chassis);
 
     struct ed_type_flow_output *fo =
-        (struct ed_type_flow_output *)node->data;
+        node->internal_data;
     struct ovn_desired_flow_table *flow_table = &fo->flow_table;
     struct ovn_extend_table *group_table = &fo->group_table;
     struct ovn_extend_table *meter_table = &fo->meter_table;
@@ -1899,7 +1875,7 @@  main(int argc, char *argv[])
     struct ed_type_addr_sets ed_addr_sets;
     struct ed_type_port_groups ed_port_groups;
 
-    ENGINE_NODE(ct_zones, "ct_zones");
+    ENGINE_NODE_CUSTOM_DATA(ct_zones, "ct_zones");
     ENGINE_NODE(runtime_data, "runtime_data");
     ENGINE_NODE(mff_ovn_geneve, "mff_ovn_geneve");
     ENGINE_NODE(ofctrl_is_connected, "ofctrl_is_connected");
@@ -2065,7 +2041,10 @@  main(int argc, char *argv[])
             }
 
             if (br_int) {
-                ofctrl_run(br_int, &ed_ct_zones.pending);
+                if (en_ct_zones.data) {
+                    struct ed_type_ct_zones *ct_zones = en_ct_zones.data;
+                    ofctrl_run(br_int, &ct_zones->pending);
+                }
 
                 if (chassis) {
                     patch_run(ovs_idl_txn,
@@ -2110,41 +2089,55 @@  main(int argc, char *argv[])
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
                                    time_msec());
                     if (ovs_idl_txn) {
-                        commit_ct_zones(br_int, &ed_ct_zones.pending);
+                        if (en_ct_zones.data) {
+                            struct ed_type_ct_zones *ct_zones =
+                                en_ct_zones.data;
+                            commit_ct_zones(br_int, &ct_zones->pending);
+                        }
                         bfd_run(ovsrec_interface_table_get(ovs_idl_loop.idl),
                                 br_int, chassis,
                                 sbrec_ha_chassis_group_table_get(
                                     ovnsb_idl_loop.idl),
                                 sbrec_sb_global_table_get(ovnsb_idl_loop.idl));
                     }
-                    ofctrl_put(&ed_flow_output.flow_table,
-                               &ed_ct_zones.pending,
-                               sbrec_meter_table_get(ovnsb_idl_loop.idl),
-                               get_nb_cfg(sbrec_sb_global_table_get(
-                                              ovnsb_idl_loop.idl)),
-                               engine_node_changed(&en_flow_output));
-                    pinctrl_run(ovnsb_idl_txn,
-                                sbrec_datapath_binding_by_key,
-                                sbrec_port_binding_by_datapath,
-                                sbrec_port_binding_by_key,
-                                sbrec_port_binding_by_name,
-                                sbrec_mac_binding_by_lport_ip,
-                                sbrec_igmp_group,
-                                sbrec_ip_multicast,
-                                sbrec_dns_table_get(ovnsb_idl_loop.idl),
-                                sbrec_controller_event_table_get(
-                                    ovnsb_idl_loop.idl),
-                                br_int, chassis,
-                                &ed_runtime_data.local_datapaths,
-                                &ed_runtime_data.active_tunnels);
 
-                    if (engine_node_changed(&en_runtime_data)) {
-                        update_sb_monitors(ovnsb_idl_loop.idl, chassis,
-                                           &ed_runtime_data.local_lports,
-                                           &ed_runtime_data.local_datapaths);
+                    if (en_flow_output.data && en_ct_zones.data) {
+                        struct ed_type_ct_zones *ct_zones =
+                            en_ct_zones.data;
+                        struct ed_type_flow_output *flow_output =
+                            en_flow_output.data;
+                        ofctrl_put(&flow_output->flow_table,
+                                   &ct_zones->pending,
+                                   sbrec_meter_table_get(ovnsb_idl_loop.idl),
+                                   get_nb_cfg(sbrec_sb_global_table_get(
+                                                   ovnsb_idl_loop.idl)),
+                                   engine_node_changed(&en_flow_output));
                     }
-                }
 
+                    if (en_runtime_data.data) {
+                        struct ed_type_runtime_data *rt_data =
+                            en_runtime_data.data;
+                        pinctrl_run(ovnsb_idl_txn,
+                                    sbrec_datapath_binding_by_key,
+                                    sbrec_port_binding_by_datapath,
+                                    sbrec_port_binding_by_key,
+                                    sbrec_port_binding_by_name,
+                                    sbrec_mac_binding_by_lport_ip,
+                                    sbrec_igmp_group,
+                                    sbrec_ip_multicast,
+                                    sbrec_dns_table_get(ovnsb_idl_loop.idl),
+                                    sbrec_controller_event_table_get(
+                                        ovnsb_idl_loop.idl),
+                                    br_int, chassis,
+                                    &rt_data->local_datapaths,
+                                    &rt_data->active_tunnels);
+                        if (engine_node_changed(&en_runtime_data)) {
+                            update_sb_monitors(ovnsb_idl_loop.idl, chassis,
+                                               &rt_data->local_lports,
+                                               &rt_data->local_datapaths);
+                        }
+                    }
+                }
             }
             if (engine_need_run(en_nodes, en_count)) {
                 VLOG_DBG("engine did not run, force recompute next time: "
@@ -2173,9 +2166,14 @@  main(int argc, char *argv[])
 
 
             if (pending_pkt.conn) {
-                if (br_int && chassis) {
+                if (br_int && chassis && en_addr_sets.data &&
+                        en_port_groups.data) {
+                    struct ed_type_addr_sets *as_data =
+                        en_addr_sets.data;
+                    struct ed_type_port_groups *pg_data =
+                        en_port_groups.data;
                     char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
-                        &ed_addr_sets.addr_sets, &ed_port_groups.port_groups);
+                        &as_data->addr_sets, &pg_data->port_groups);
                     if (error) {
                         unixctl_command_reply_error(pending_pkt.conn, error);
                         free(error);
@@ -2213,12 +2211,15 @@  main(int argc, char *argv[])
         }
 
         if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
-            struct shash_node *iter, *iter_next;
-            SHASH_FOR_EACH_SAFE (iter, iter_next, &ed_ct_zones.pending) {
-                struct ct_zone_pending_entry *ctzpe = iter->data;
-                if (ctzpe->state == CT_ZONE_DB_SENT) {
-                    shash_delete(&ed_ct_zones.pending, iter);
-                    free(ctzpe);
+            if (en_ct_zones.data) {
+                struct ed_type_ct_zones *ct_zones = en_ct_zones.data;
+                struct shash_node *iter, *iter_next;
+                SHASH_FOR_EACH_SAFE (iter, iter_next, &ct_zones->pending) {
+                    struct ct_zone_pending_entry *ctzpe = iter->data;
+                    if (ctzpe->state == CT_ZONE_DB_SENT) {
+                        shash_delete(&ct_zones->pending, iter);
+                        free(ctzpe);
+                    }
                 }
             }
         }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index b438a15..b50a854 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -149,7 +149,7 @@  engine_add_input(struct engine_node *node, struct engine_node *input,
 struct ovsdb_idl_index *
 engine_ovsdb_node_get_index(struct engine_node *node, const char *name)
 {
-    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table *)node->data;
+    struct ed_type_ovsdb_table *ed = node->internal_data;
     for (size_t i = 0; i < ed->n_indexes; i++) {
         if (!strcmp(ed->indexes[i].name, name)) {
             return ed->indexes[i].index;
@@ -163,7 +163,7 @@  void
 engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
                             struct ovsdb_idl_index *index)
 {
-    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table *)node->data;
+    struct ed_type_ovsdb_table *ed = node->internal_data;
     ovs_assert(ed->n_indexes < ENGINE_MAX_OVSDB_INDEX);
 
     ed->indexes[ed->n_indexes].name = name;
@@ -191,6 +191,9 @@  engine_set_node_state_at(struct engine_node *node,
 static bool
 engine_node_valid(struct engine_node *node)
 {
+    if (node->is_valid) {
+        return node->is_valid(node);
+    }
     return (node->state == EN_UPDATED || node->state == EN_VALID);
 }
 
@@ -212,6 +215,15 @@  engine_aborted(struct engine_node *node)
     return node->state == EN_ABORTED;
 }
 
+static void *
+engine_get_data(struct engine_node *node)
+{
+    if (engine_node_valid(node)) {
+        return node->internal_data;
+    }
+    return NULL;
+}
+
 void
 engine_init_run(struct engine_node **nodes, size_t n_count,
                 struct engine_node *root_node)
@@ -224,6 +236,11 @@  engine_init_run(struct engine_node **nodes, size_t n_count,
     VLOG_DBG("Initializing new run");
     for (size_t i = 0; i < n_count; i++) {
         engine_set_node_state(nodes[i], EN_STALE);
+
+        /* Make sure we reset the data pointer for outside users.
+         * For nodes that always store valid data the value will be non-NULL.
+         */
+        nodes[i]->data = engine_get_data(nodes[i]);
     }
 }
 
@@ -346,6 +363,10 @@  engine_run(struct engine_node **nodes, size_t n_count)
 {
     for (size_t i = 0; i < n_count; i++) {
         engine_run_node(nodes[i]);
+        /* Make sure we reset the data pointer for outside users as the
+         * node's state might have changed.
+         */
+        nodes[i]->data = engine_get_data(nodes[i]);
     }
 }
 
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 9a35f1f..18a27e5 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -111,6 +111,12 @@  struct engine_node {
      * and run() function of the current node. Users should ensure that the
      * data is read-only in change-handlers of the nodes that depends on this
      * node. */
+    void *internal_data;
+
+    /* A pointer to node data accessible for users outside the processing
+     * engine. The value of the pointer is updated by the engine itself and
+     * users should ensure that the data is only read.
+     */
     void *data;
 
     /* State of the node after the last engine run. */
@@ -125,6 +131,13 @@  struct engine_node {
     /* Fully processes all inputs of this node and regenerates the data
      * of this node */
     void (*run)(struct engine_node *);
+
+    /* Method to validate if the 'internal_data' is valid. This allows users
+     * to customize when 'internal_data' can be used (e.g., even if the node
+     * hasn't been refreshed in the last iteration, if 'internal_data'
+     * doesn't store pointers to DB records it's still safe to use).
+     */
+    bool (*is_valid)(struct engine_node *);
 };
 
 /* Return the array of topologically sorted nodes when starting from
@@ -211,7 +224,7 @@  struct ed_type_ovsdb_table {
 };
 
 #define EN_OVSDB_GET(NODE) \
-    (((struct ed_type_ovsdb_table *)NODE->data)->table)
+    (((struct ed_type_ovsdb_table *)NODE->internal_data)->table)
 
 struct ovsdb_idl_index * engine_ovsdb_node_get_index(struct engine_node *,
                                                      const char *name);
@@ -220,16 +233,25 @@  void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
                                  struct ovsdb_idl_index *);
 
 /* Macro to define an engine node. */
-#define ENGINE_NODE(NAME, NAME_STR) \
+#define ENGINE_NODE_DEF(NAME, NAME_STR) \
     struct engine_node en_##NAME = { \
         .name = NAME_STR, \
-        .data = &ed_##NAME, \
+        .internal_data = &ed_##NAME, \
+        .data = NULL, \
         .state = EN_STALE, \
         .init = en_##NAME##_init, \
         .run = en_##NAME##_run, \
         .cleanup = en_##NAME##_cleanup, \
+        .is_valid = en_##NAME##_is_valid, \
     };
 
+#define ENGINE_NODE_CUSTOM_DATA(NAME, NAME_STR) \
+    ENGINE_NODE_DEF(NAME, NAME_STR)
+
+#define ENGINE_NODE(NAME, NAME_STR) \
+    static bool (*en_##NAME##_is_valid)(struct engine_node *node) = NULL; \
+    ENGINE_NODE_DEF(NAME, NAME_STR)
+
 /* Macro to define member functions of an engine node which represents
  * a table of OVSDB */
 #define ENGINE_FUNC_OVSDB(DB_NAME, TBL_NAME) \