diff mbox series

[ovs-dev,ovn,v2] ovn-controller: Fix use of dangling pointers in I-P runtime_data.

Message ID 1572872026-29054-1-git-send-email-dceara@redhat.com
State Superseded
Headers show
Series [ovs-dev,ovn,v2] ovn-controller: Fix use of dangling pointers in I-P runtime_data. | expand

Commit Message

Dumitru Ceara Nov. 4, 2019, 12:53 p.m. UTC
The incremental processing engine might stop a run before the
en_runtime_data node is processed. In such cases the ed_runtime_data
fields might contain pointers to already deleted SB records. For
example, if a port binding corresponding to a patch port is removed from
the SB database and the incremental processing engine aborts before the
en_runtime_data node is processed then the corresponding local_datapath
hashtable entry in ed_runtime_data is stale and will store a pointer to
the already freed sbrec_port_binding record.

This will cause invalid memory accesses in various places (e.g.,
pinctrl_run() -> prepare_ipv6_ras()).

To fix the issue we need a way to track how each node was processed
during an engine run. This commit transforms the 'changed' field in
struct engine_node in a 'state' field. Possible node states are:
- "New": the node is not yet initialized.
- "Stale": data in the node is not up to date with the DB.
- "Updated": data in the node is valid but was updated during
  the last run of the engine.
- "Valid": data in the node is valid and didn't change during
  the last run of the engine.
- "Aborted": during the last run, processing was aborted for
  this node.
- "Destroyed": the node was already cleaned up.

We also add a separation between engine node data that can be accessed
at any time (regardless if the last engine run was successful or not)
and data that may be accessed only if the nodes are up to date. This
helps avoiding custom "engine_node_valid" handlers for different
nodes.

The commit also simplifies the logic of calling engine_run and
engine_need_run in order to reduce the number of external variables
required to track the result of the last engine execution.

Functions that need to be called from the main loop and depend on
various data contents of the engine's nodes are now called only if
the data is up to date.

CC: Han Zhou <hzhou8@ebay.com>
Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - quiet mode.")
Signed-off-by: Dumitru Ceara <dceara@redhat.com>

---
v2: Address Han's comments:
- call engine_node_valid() in all the places where node local data is
  used.
- move out "global" data outside the engine nodes. Make a clear
  separation between data that can be safely used at any time and data
  that can be used only when the engine run was successful.
- add a debug log for iterations when the engine didn't run.
- refactor a bit more the incremental engine code.
---
 controller/ovn-controller.c | 312 +++++++++++++++++++++++++++-----------------
 lib/inc-proc-eng.c          | 233 ++++++++++++++++++++++++++-------
 lib/inc-proc-eng.h          |  72 ++++++++--
 3 files changed, 433 insertions(+), 184 deletions(-)

Comments

Han Zhou Nov. 8, 2019, 7:22 p.m. UTC | #1
1. storage data and the void *arg of init() breaks the engine node data
encapsulation.
2. engine_node_valid(&en_flow_output, engine_run_id) is not needed? Should
use storage to access instead?
3. refactor of engine is good but better to be a separate commit
4. we can have a new interface: engine_get_data(), which returns data if it
is valid. we should never expose the data directly. We should init the
engine node with dynamically allocated engine data structure (and remember
to free during destroy)

Hi Dumitru,

Sorry for late response.
On Mon, Nov 4, 2019 at 4:54 AM Dumitru Ceara <dceara@redhat.com> wrote:
>
> The incremental processing engine might stop a run before the
> en_runtime_data node is processed. In such cases the ed_runtime_data
> fields might contain pointers to already deleted SB records. For
> example, if a port binding corresponding to a patch port is removed from
> the SB database and the incremental processing engine aborts before the
> en_runtime_data node is processed then the corresponding local_datapath
> hashtable entry in ed_runtime_data is stale and will store a pointer to
> the already freed sbrec_port_binding record.
>
> This will cause invalid memory accesses in various places (e.g.,
> pinctrl_run() -> prepare_ipv6_ras()).
>
> To fix the issue we need a way to track how each node was processed
> during an engine run. This commit transforms the 'changed' field in
> struct engine_node in a 'state' field. Possible node states are:
> - "New": the node is not yet initialized.
> - "Stale": data in the node is not up to date with the DB.
> - "Updated": data in the node is valid but was updated during
>   the last run of the engine.
> - "Valid": data in the node is valid and didn't change during
>   the last run of the engine.
> - "Aborted": during the last run, processing was aborted for
>   this node.
> - "Destroyed": the node was already cleaned up.
>
> We also add a separation between engine node data that can be accessed
> at any time (regardless if the last engine run was successful or not)
> and data that may be accessed only if the nodes are up to date. This
> helps avoiding custom "engine_node_valid" handlers for different
> nodes.
>
> The commit also simplifies the logic of calling engine_run and
> engine_need_run in order to reduce the number of external variables
> required to track the result of the last engine execution.
>
> Functions that need to be called from the main loop and depend on
> various data contents of the engine's nodes are now called only if
> the data is up to date.
>
> CC: Han Zhou <hzhou8@ebay.com>
> Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine -
quiet mode.")
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
>
> ---
> v2: Address Han's comments:
> - call engine_node_valid() in all the places where node local data is
>   used.
> - move out "global" data outside the engine nodes. Make a clear
>   separation between data that can be safely used at any time and data
>   that can be used only when the engine run was successful.

I am concerned with this kind of separation of *global* data, which breaks
the data encapsulation of engine node, and can easily result in hidden
dependency. As you know the main purpose of the I-P engine is to force
explicit dependency exposed between different engine nodes thus ensure the
correctness (at least it helps to ensure) of incremental processing.

Here is my proposal to address the problem with better encapsulation.

Firstly, let's avoid direct engine data access outside of engine module. At
engine node construction, instead of using reference of stack variable
(such as struct ed_type_runtime_data ed_runtime_data), we can allocate the
memory in the engine node's init() interface, and free in the cleanup()
interface. This way, there will be no way to directly access engine data
like &ed_runtime_data.local_datapaths.

Secondly, let's add a engine module interface engine_get_data() to retrieve
*and validate* data for an engine node:
void *
engine_get_data(struct engine_node *node, uint64_t run_id)
{
    if (engine_node_valid(node, run_id)) {
        return node->data;
    }
    return NULL;
}

This should be used whenever an engine node data need to be accessed. (It
may be even better to use node name as input instead of node structure, but
it seems ok to me either way)

Now let's address the always-valid node problem. I was proposing an
is_valid() interface for engine node. It can be NULL by default, but if a
node xxx's data is always valid, it can be implemented like:

static bool
en_xxx_is_valid(struct engine_node *node)
{
    /* This node's data will always be valid */
    return true;
}

For the engine_node_valid() function, it can be changed slightly:

bool
engine_node_valid(struct engine_node *node, uint64_t run_id)
{
    if (node->is_valid) {
        return node->is_valid();
    }
    return node->run_id == run_id &&
        (node->state == EN_UPDATED || node->state == EN_VALID);
}

So if is_valid is not implemented, it will be the default check, otherwise,
follow whatever logic is used for is_valid(). This is flexible and may fit
for some new cases if a node's data validity depends on some other factors
it can always be customized in is_valid() interface of that node.

This may result in a little bit more code, since we will have to use
engine_get_data() to retrieve data from a node (and convert to its node's
data type) before accessing, but it would look more clean and safe, while
keeping the dependency well maintained. Does this sound plausible?

Thanks for the other updates and refactors. Please see some minor comments
inlined below.

>
> -                    if (en_runtime_data.changed) {
> +                    /* We need to make sure the en_flow_output node was
> +                     * properly computed before trying to install OF
flows.
> +                     */
> +                    if (engine_node_valid(&en_flow_output,
engine_run_id)) {

I guess your intention of using storage.flow_table here was to avoid the
engine_node_valid check, so that this "if" can be avoided, right? (However,
I suggest not using storage at all as discussed above)

> +                        ofctrl_put(&storage.flow_table,
> +                                   &storage.pending_ct_zones,
> +
sbrec_meter_table_get(ovnsb_idl_loop.idl),
> +                                   get_nb_cfg(sbrec_sb_global_table_get(
> +                                                   ovnsb_idl_loop.idl)),
> +                                   engine_node_changed(&en_flow_output,
> +                                                       engine_run_id));
> +                    }




>      if (engine_force_recompute) {
> -        need_recompute = true;
> -    } else {
> -        for (size_t i = 0; i < node->n_inputs; i++) {
> -            if (node->inputs[i].node->changed) {
> -                need_compute = true;
> -                if (!node->inputs[i].change_handler) {
> -                    need_recompute = true;
> -                    break;
> -                }
> +        engine_recompute(node, true, !engine_abort_recompute);
> +        return;
> +    }
> +
> +    /* If one of the inputs updated data then we need to recompute the
> +     * current node too.
> +     */

The comment could be more accurate as: If any of the inputs updated data
but there is no change_handler, then ...

> +    for (size_t i = 0; i < node->n_inputs; i++) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
> +            need_compute = true;
> +
> +            /* Trigger a recompute if we don't have a change handler. */
> +            if (!node->inputs[i].change_handler) {
> +                engine_recompute(node, false, !engine_abort_recompute);
> +                return;
>              }
>          }
>      }
>

Thanks,
Han
Han Zhou Nov. 8, 2019, 7:26 p.m. UTC | #2
On Fri, Nov 8, 2019 at 11:22 AM Han Zhou <hzhou@ovn.org> wrote:
>
> 1. storage data and the void *arg of init() breaks the engine node data
encapsulation.
> 2. engine_node_valid(&en_flow_output, engine_run_id) is not needed?
Should use storage to access instead?
> 3. refactor of engine is good but better to be a separate commit
> 4. we can have a new interface: engine_get_data(), which returns data if
it is valid. we should never expose the data directly. We should init the
engine node with dynamically allocated engine data structure (and remember
to free during destroy)

Oops! please ignore the above part since it was draft and I forgot to
delete after editing the formal response, mostly redundant :-)
Real response started here =>
>
> Hi Dumitru,
>
> Sorry for late response.
> On Mon, Nov 4, 2019 at 4:54 AM Dumitru Ceara <dceara@redhat.com> wrote:
> >
> > The incremental processing engine might stop a run before the
> > en_runtime_data node is processed. In such cases the ed_runtime_data
> > fields might contain pointers to already deleted SB records. For
> > example, if a port binding corresponding to a patch port is removed from
> > the SB database and the incremental processing engine aborts before the
> > en_runtime_data node is processed then the corresponding local_datapath
> > hashtable entry in ed_runtime_data is stale and will store a pointer to
> > the already freed sbrec_port_binding record.
> >
> > This will cause invalid memory accesses in various places (e.g.,
> > pinctrl_run() -> prepare_ipv6_ras()).
> >
> > To fix the issue we need a way to track how each node was processed
> > during an engine run. This commit transforms the 'changed' field in
> > struct engine_node in a 'state' field. Possible node states are:
> > - "New": the node is not yet initialized.
> > - "Stale": data in the node is not up to date with the DB.
> > - "Updated": data in the node is valid but was updated during
> >   the last run of the engine.
> > - "Valid": data in the node is valid and didn't change during
> >   the last run of the engine.
> > - "Aborted": during the last run, processing was aborted for
> >   this node.
> > - "Destroyed": the node was already cleaned up.
> >
> > We also add a separation between engine node data that can be accessed
> > at any time (regardless if the last engine run was successful or not)
> > and data that may be accessed only if the nodes are up to date. This
> > helps avoiding custom "engine_node_valid" handlers for different
> > nodes.
> >
> > The commit also simplifies the logic of calling engine_run and
> > engine_need_run in order to reduce the number of external variables
> > required to track the result of the last engine execution.
> >
> > Functions that need to be called from the main loop and depend on
> > various data contents of the engine's nodes are now called only if
> > the data is up to date.
> >
> > CC: Han Zhou <hzhou8@ebay.com>
> > Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine
- quiet mode.")
> > Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> >
> > ---
> > v2: Address Han's comments:
> > - call engine_node_valid() in all the places where node local data is
> >   used.
> > - move out "global" data outside the engine nodes. Make a clear
> >   separation between data that can be safely used at any time and data
> >   that can be used only when the engine run was successful.
>
> I am concerned with this kind of separation of *global* data, which
breaks the data encapsulation of engine node, and can easily result in
hidden dependency. As you know the main purpose of the I-P engine is to
force explicit dependency exposed between different engine nodes thus
ensure the correctness (at least it helps to ensure) of incremental
processing.
>
> Here is my proposal to address the problem with better encapsulation.
>
> Firstly, let's avoid direct engine data access outside of engine module.
At engine node construction, instead of using reference of stack variable
(such as struct ed_type_runtime_data ed_runtime_data), we can allocate the
memory in the engine node's init() interface, and free in the cleanup()
interface. This way, there will be no way to directly access engine data
like &ed_runtime_data.local_datapaths.
>
> Secondly, let's add a engine module interface engine_get_data() to
retrieve *and validate* data for an engine node:
> void *
> engine_get_data(struct engine_node *node, uint64_t run_id)
> {
>     if (engine_node_valid(node, run_id)) {
>         return node->data;
>     }
>     return NULL;
> }
>
> This should be used whenever an engine node data need to be accessed. (It
may be even better to use node name as input instead of node structure, but
it seems ok to me either way)
>
> Now let's address the always-valid node problem. I was proposing an
is_valid() interface for engine node. It can be NULL by default, but if a
node xxx's data is always valid, it can be implemented like:
>
> static bool
> en_xxx_is_valid(struct engine_node *node)
> {
>     /* This node's data will always be valid */
>     return true;
> }
>
> For the engine_node_valid() function, it can be changed slightly:
>
> bool
> engine_node_valid(struct engine_node *node, uint64_t run_id)
> {
>     if (node->is_valid) {
>         return node->is_valid();
>     }
>     return node->run_id == run_id &&
>         (node->state == EN_UPDATED || node->state == EN_VALID);
> }
>
> So if is_valid is not implemented, it will be the default check,
otherwise, follow whatever logic is used for is_valid(). This is flexible
and may fit for some new cases if a node's data validity depends on some
other factors it can always be customized in is_valid() interface of that
node.
>
> This may result in a little bit more code, since we will have to use
engine_get_data() to retrieve data from a node (and convert to its node's
data type) before accessing, but it would look more clean and safe, while
keeping the dependency well maintained. Does this sound plausible?
>
> Thanks for the other updates and refactors. Please see some minor
comments inlined below.
>
> >
> > -                    if (en_runtime_data.changed) {
> > +                    /* We need to make sure the en_flow_output node was
> > +                     * properly computed before trying to install OF
flows.
> > +                     */
> > +                    if (engine_node_valid(&en_flow_output,
engine_run_id)) {
>
> I guess your intention of using storage.flow_table here was to avoid the
engine_node_valid check, so that this "if" can be avoided, right? (However,
I suggest not using storage at all as discussed above)
>
> > +                        ofctrl_put(&storage.flow_table,
> > +                                   &storage.pending_ct_zones,
> > +
sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > +
get_nb_cfg(sbrec_sb_global_table_get(
> > +
ovnsb_idl_loop.idl)),
> > +                                   engine_node_changed(&en_flow_output,
> > +                                                       engine_run_id));
> > +                    }
>
>
>
>
> >      if (engine_force_recompute) {
> > -        need_recompute = true;
> > -    } else {
> > -        for (size_t i = 0; i < node->n_inputs; i++) {
> > -            if (node->inputs[i].node->changed) {
> > -                need_compute = true;
> > -                if (!node->inputs[i].change_handler) {
> > -                    need_recompute = true;
> > -                    break;
> > -                }
> > +        engine_recompute(node, true, !engine_abort_recompute);
> > +        return;
> > +    }
> > +
> > +    /* If one of the inputs updated data then we need to recompute the
> > +     * current node too.
> > +     */
>
> The comment could be more accurate as: If any of the inputs updated data
but there is no change_handler, then ...
>
> > +    for (size_t i = 0; i < node->n_inputs; i++) {
> > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > +            need_compute = true;
> > +
> > +            /* Trigger a recompute if we don't have a change handler.
*/
> > +            if (!node->inputs[i].change_handler) {
> > +                engine_recompute(node, false, !engine_abort_recompute);
> > +                return;
> >              }
> >          }
> >      }
> >
>
> Thanks,
> Han
Dumitru Ceara Nov. 13, 2019, 8:18 a.m. UTC | #3
On Fri, Nov 8, 2019 at 8:27 PM Han Zhou <hzhou@ovn.org> wrote:
>
>
>
> On Fri, Nov 8, 2019 at 11:22 AM Han Zhou <hzhou@ovn.org> wrote:
> >
> > 1. storage data and the void *arg of init() breaks the engine node data encapsulation.
> > 2. engine_node_valid(&en_flow_output, engine_run_id) is not needed? Should use storage to access instead?
> > 3. refactor of engine is good but better to be a separate commit
> > 4. we can have a new interface: engine_get_data(), which returns data if it is valid. we should never expose the data directly. We should init the engine node with dynamically allocated engine data structure (and remember to free during destroy)
>
> Oops! please ignore the above part since it was draft and I forgot to delete after editing the formal response, mostly redundant :-)
> Real response started here =>
> >
> > Hi Dumitru,
> >
> > Sorry for late response.
> > On Mon, Nov 4, 2019 at 4:54 AM Dumitru Ceara <dceara@redhat.com> wrote:
> > >
> > > The incremental processing engine might stop a run before the
> > > en_runtime_data node is processed. In such cases the ed_runtime_data
> > > fields might contain pointers to already deleted SB records. For
> > > example, if a port binding corresponding to a patch port is removed from
> > > the SB database and the incremental processing engine aborts before the
> > > en_runtime_data node is processed then the corresponding local_datapath
> > > hashtable entry in ed_runtime_data is stale and will store a pointer to
> > > the already freed sbrec_port_binding record.
> > >
> > > This will cause invalid memory accesses in various places (e.g.,
> > > pinctrl_run() -> prepare_ipv6_ras()).
> > >
> > > To fix the issue we need a way to track how each node was processed
> > > during an engine run. This commit transforms the 'changed' field in
> > > struct engine_node in a 'state' field. Possible node states are:
> > > - "New": the node is not yet initialized.
> > > - "Stale": data in the node is not up to date with the DB.
> > > - "Updated": data in the node is valid but was updated during
> > >   the last run of the engine.
> > > - "Valid": data in the node is valid and didn't change during
> > >   the last run of the engine.
> > > - "Aborted": during the last run, processing was aborted for
> > >   this node.
> > > - "Destroyed": the node was already cleaned up.
> > >
> > > We also add a separation between engine node data that can be accessed
> > > at any time (regardless if the last engine run was successful or not)
> > > and data that may be accessed only if the nodes are up to date. This
> > > helps avoiding custom "engine_node_valid" handlers for different
> > > nodes.
> > >
> > > The commit also simplifies the logic of calling engine_run and
> > > engine_need_run in order to reduce the number of external variables
> > > required to track the result of the last engine execution.
> > >
> > > Functions that need to be called from the main loop and depend on
> > > various data contents of the engine's nodes are now called only if
> > > the data is up to date.
> > >
> > > CC: Han Zhou <hzhou8@ebay.com>
> > > Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - quiet mode.")
> > > Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> > >
> > > ---
> > > v2: Address Han's comments:
> > > - call engine_node_valid() in all the places where node local data is
> > >   used.
> > > - move out "global" data outside the engine nodes. Make a clear
> > >   separation between data that can be safely used at any time and data
> > >   that can be used only when the engine run was successful.
> >
> > I am concerned with this kind of separation of *global* data, which breaks the data encapsulation of engine node, and can easily result in hidden dependency. As you know the main purpose of the I-P engine is to force explicit dependency exposed between different engine nodes thus ensure the correctness (at least it helps to ensure) of incremental processing.
> >
> > Here is my proposal to address the problem with better encapsulation.
> >
> > Firstly, let's avoid direct engine data access outside of engine module. At engine node construction, instead of using reference of stack variable (such as struct ed_type_runtime_data ed_runtime_data), we can allocate the memory in the engine node's init() interface, and free in the cleanup() interface. This way, there will be no way to directly access engine data like &ed_runtime_data.local_datapaths.
> >
> > Secondly, let's add a engine module interface engine_get_data() to retrieve *and validate* data for an engine node:
> > void *
> > engine_get_data(struct engine_node *node, uint64_t run_id)
> > {
> >     if (engine_node_valid(node, run_id)) {
> >         return node->data;
> >     }
> >     return NULL;
> > }
> >
> > This should be used whenever an engine node data need to be accessed. (It may be even better to use node name as input instead of node structure, but it seems ok to me either way)
> >
> > Now let's address the always-valid node problem. I was proposing an is_valid() interface for engine node. It can be NULL by default, but if a node xxx's data is always valid, it can be implemented like:
> >
> > static bool
> > en_xxx_is_valid(struct engine_node *node)
> > {
> >     /* This node's data will always be valid */
> >     return true;
> > }
> >
> > For the engine_node_valid() function, it can be changed slightly:
> >
> > bool
> > engine_node_valid(struct engine_node *node, uint64_t run_id)
> > {
> >     if (node->is_valid) {
> >         return node->is_valid();
> >     }
> >     return node->run_id == run_id &&
> >         (node->state == EN_UPDATED || node->state == EN_VALID);
> > }
> >
> > So if is_valid is not implemented, it will be the default check, otherwise, follow whatever logic is used for is_valid(). This is flexible and may fit for some new cases if a node's data validity depends on some other factors it can always be customized in is_valid() interface of that node.
> >
> > This may result in a little bit more code, since we will have to use engine_get_data() to retrieve data from a node (and convert to its node's data type) before accessing, but it would look more clean and safe, while keeping the dependency well maintained. Does this sound plausible?
> >
> > Thanks for the other updates and refactors. Please see some minor comments inlined below.
> >
> > >
> > > -                    if (en_runtime_data.changed) {
> > > +                    /* We need to make sure the en_flow_output node was
> > > +                     * properly computed before trying to install OF flows.
> > > +                     */
> > > +                    if (engine_node_valid(&en_flow_output, engine_run_id)) {
> >
> > I guess your intention of using storage.flow_table here was to avoid the engine_node_valid check, so that this "if" can be avoided, right? (However, I suggest not using storage at all as discussed above)
> >
> > > +                        ofctrl_put(&storage.flow_table,
> > > +                                   &storage.pending_ct_zones,
> > > +                                   sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > > +                                   get_nb_cfg(sbrec_sb_global_table_get(
> > > +                                                   ovnsb_idl_loop.idl)),
> > > +                                   engine_node_changed(&en_flow_output,
> > > +                                                       engine_run_id));
> > > +                    }
> >
> >
> >
> >
> > >      if (engine_force_recompute) {
> > > -        need_recompute = true;
> > > -    } else {
> > > -        for (size_t i = 0; i < node->n_inputs; i++) {
> > > -            if (node->inputs[i].node->changed) {
> > > -                need_compute = true;
> > > -                if (!node->inputs[i].change_handler) {
> > > -                    need_recompute = true;
> > > -                    break;
> > > -                }
> > > +        engine_recompute(node, true, !engine_abort_recompute);
> > > +        return;
> > > +    }
> > > +
> > > +    /* If one of the inputs updated data then we need to recompute the
> > > +     * current node too.
> > > +     */
> >
> > The comment could be more accurate as: If any of the inputs updated data but there is no change_handler, then ...
> >
> > > +    for (size_t i = 0; i < node->n_inputs; i++) {
> > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > > +            need_compute = true;
> > > +
> > > +            /* Trigger a recompute if we don't have a change handler. */
> > > +            if (!node->inputs[i].change_handler) {
> > > +                engine_recompute(node, false, !engine_abort_recompute);
> > > +                return;
> > >              }
> > >          }
> > >      }
> > >
> >
> > Thanks,
> > Han

Hi Han,

Thanks for your comments. Sorry I didn't reply earlier.
I'll send a new version soon improving the data encapsulation and
other issues you pointed out.

Regards,
Dumitru
Dumitru Ceara Nov. 14, 2019, 5:11 p.m. UTC | #4
On Wed, Nov 13, 2019 at 9:18 AM Dumitru Ceara <dceara@redhat.com> wrote:
>
> On Fri, Nov 8, 2019 at 8:27 PM Han Zhou <hzhou@ovn.org> wrote:
> >
> >
> >
> > On Fri, Nov 8, 2019 at 11:22 AM Han Zhou <hzhou@ovn.org> wrote:
> > >
> > > 1. storage data and the void *arg of init() breaks the engine node data encapsulation.
> > > 2. engine_node_valid(&en_flow_output, engine_run_id) is not needed? Should use storage to access instead?
> > > 3. refactor of engine is good but better to be a separate commit
> > > 4. we can have a new interface: engine_get_data(), which returns data if it is valid. we should never expose the data directly. We should init the engine node with dynamically allocated engine data structure (and remember to free during destroy)
> >
> > Oops! please ignore the above part since it was draft and I forgot to delete after editing the formal response, mostly redundant :-)
> > Real response started here =>
> > >
> > > Hi Dumitru,
> > >
> > > Sorry for late response.
> > > On Mon, Nov 4, 2019 at 4:54 AM Dumitru Ceara <dceara@redhat.com> wrote:
> > > >
> > > > The incremental processing engine might stop a run before the
> > > > en_runtime_data node is processed. In such cases the ed_runtime_data
> > > > fields might contain pointers to already deleted SB records. For
> > > > example, if a port binding corresponding to a patch port is removed from
> > > > the SB database and the incremental processing engine aborts before the
> > > > en_runtime_data node is processed then the corresponding local_datapath
> > > > hashtable entry in ed_runtime_data is stale and will store a pointer to
> > > > the already freed sbrec_port_binding record.
> > > >
> > > > This will cause invalid memory accesses in various places (e.g.,
> > > > pinctrl_run() -> prepare_ipv6_ras()).
> > > >
> > > > To fix the issue we need a way to track how each node was processed
> > > > during an engine run. This commit transforms the 'changed' field in
> > > > struct engine_node in a 'state' field. Possible node states are:
> > > > - "New": the node is not yet initialized.
> > > > - "Stale": data in the node is not up to date with the DB.
> > > > - "Updated": data in the node is valid but was updated during
> > > >   the last run of the engine.
> > > > - "Valid": data in the node is valid and didn't change during
> > > >   the last run of the engine.
> > > > - "Aborted": during the last run, processing was aborted for
> > > >   this node.
> > > > - "Destroyed": the node was already cleaned up.
> > > >
> > > > We also add a separation between engine node data that can be accessed
> > > > at any time (regardless if the last engine run was successful or not)
> > > > and data that may be accessed only if the nodes are up to date. This
> > > > helps avoiding custom "engine_node_valid" handlers for different
> > > > nodes.
> > > >
> > > > The commit also simplifies the logic of calling engine_run and
> > > > engine_need_run in order to reduce the number of external variables
> > > > required to track the result of the last engine execution.
> > > >
> > > > Functions that need to be called from the main loop and depend on
> > > > various data contents of the engine's nodes are now called only if
> > > > the data is up to date.
> > > >
> > > > CC: Han Zhou <hzhou8@ebay.com>
> > > > Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - quiet mode.")
> > > > Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> > > >
> > > > ---
> > > > v2: Address Han's comments:
> > > > - call engine_node_valid() in all the places where node local data is
> > > >   used.
> > > > - move out "global" data outside the engine nodes. Make a clear
> > > >   separation between data that can be safely used at any time and data
> > > >   that can be used only when the engine run was successful.
> > >
> > > I am concerned with this kind of separation of *global* data, which breaks the data encapsulation of engine node, and can easily result in hidden dependency. As you know the main purpose of the I-P engine is to force explicit dependency exposed between different engine nodes thus ensure the correctness (at least it helps to ensure) of incremental processing.
> > >
> > > Here is my proposal to address the problem with better encapsulation.
> > >
> > > Firstly, let's avoid direct engine data access outside of engine module. At engine node construction, instead of using reference of stack variable (such as struct ed_type_runtime_data ed_runtime_data), we can allocate the memory in the engine node's init() interface, and free in the cleanup() interface. This way, there will be no way to directly access engine data like &ed_runtime_data.local_datapaths.
> > >
> > > Secondly, let's add a engine module interface engine_get_data() to retrieve *and validate* data for an engine node:
> > > void *
> > > engine_get_data(struct engine_node *node, uint64_t run_id)
> > > {
> > >     if (engine_node_valid(node, run_id)) {
> > >         return node->data;
> > >     }
> > >     return NULL;
> > > }
> > >
> > > This should be used whenever an engine node data need to be accessed. (It may be even better to use node name as input instead of node structure, but it seems ok to me either way)
> > >
> > > Now let's address the always-valid node problem. I was proposing an is_valid() interface for engine node. It can be NULL by default, but if a node xxx's data is always valid, it can be implemented like:
> > >
> > > static bool
> > > en_xxx_is_valid(struct engine_node *node)
> > > {
> > >     /* This node's data will always be valid */
> > >     return true;
> > > }
> > >
> > > For the engine_node_valid() function, it can be changed slightly:
> > >
> > > bool
> > > engine_node_valid(struct engine_node *node, uint64_t run_id)
> > > {
> > >     if (node->is_valid) {
> > >         return node->is_valid();
> > >     }
> > >     return node->run_id == run_id &&
> > >         (node->state == EN_UPDATED || node->state == EN_VALID);
> > > }
> > >
> > > So if is_valid is not implemented, it will be the default check, otherwise, follow whatever logic is used for is_valid(). This is flexible and may fit for some new cases if a node's data validity depends on some other factors it can always be customized in is_valid() interface of that node.
> > >
> > > This may result in a little bit more code, since we will have to use engine_get_data() to retrieve data from a node (and convert to its node's data type) before accessing, but it would look more clean and safe, while keeping the dependency well maintained. Does this sound plausible?
> > >
> > > Thanks for the other updates and refactors. Please see some minor comments inlined below.
> > >
> > > >
> > > > -                    if (en_runtime_data.changed) {
> > > > +                    /* We need to make sure the en_flow_output node was
> > > > +                     * properly computed before trying to install OF flows.
> > > > +                     */
> > > > +                    if (engine_node_valid(&en_flow_output, engine_run_id)) {
> > >
> > > I guess your intention of using storage.flow_table here was to avoid the engine_node_valid check, so that this "if" can be avoided, right? (However, I suggest not using storage at all as discussed above)
> > >
> > > > +                        ofctrl_put(&storage.flow_table,
> > > > +                                   &storage.pending_ct_zones,
> > > > +                                   sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > > > +                                   get_nb_cfg(sbrec_sb_global_table_get(
> > > > +                                                   ovnsb_idl_loop.idl)),
> > > > +                                   engine_node_changed(&en_flow_output,
> > > > +                                                       engine_run_id));
> > > > +                    }
> > >
> > >
> > >
> > >
> > > >      if (engine_force_recompute) {
> > > > -        need_recompute = true;
> > > > -    } else {
> > > > -        for (size_t i = 0; i < node->n_inputs; i++) {
> > > > -            if (node->inputs[i].node->changed) {
> > > > -                need_compute = true;
> > > > -                if (!node->inputs[i].change_handler) {
> > > > -                    need_recompute = true;
> > > > -                    break;
> > > > -                }
> > > > +        engine_recompute(node, true, !engine_abort_recompute);
> > > > +        return;
> > > > +    }
> > > > +
> > > > +    /* If one of the inputs updated data then we need to recompute the
> > > > +     * current node too.
> > > > +     */
> > >
> > > The comment could be more accurate as: If any of the inputs updated data but there is no change_handler, then ...
> > >
> > > > +    for (size_t i = 0; i < node->n_inputs; i++) {
> > > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > > > +            need_compute = true;
> > > > +
> > > > +            /* Trigger a recompute if we don't have a change handler. */
> > > > +            if (!node->inputs[i].change_handler) {
> > > > +                engine_recompute(node, false, !engine_abort_recompute);
> > > > +                return;
> > > >              }
> > > >          }
> > > >      }
> > > >
> > >
> > > Thanks,
> > > Han
>
> Hi Han,
>
> Thanks for your comments. Sorry I didn't reply earlier.
> I'll send a new version soon improving the data encapsulation and
> other issues you pointed out.
>
> Regards,
> Dumitru

Hi Han,

I just sent v3 (turned it into a series to make it easier to follow
the changes) where I think I addressed everything we discussed until
now:
https://patchwork.ozlabs.org/project/openvswitch/list/?series=142890

I decided to implement data accessing a bit different than you
suggested (through another pointer field in the engine node) but
that's mainly just because it makes the code that accesses the data a
bit easier to write. The internal implementation is actually the same
(through an is_valid() callback).

Thanks,
Dumitru
diff mbox series

Patch

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 9ab98be..6d2cbea 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -90,6 +90,21 @@  struct pending_pkt {
     char *flow_s;
 };
 
+/* Structure to hold global engine data. This is data that can be safely
+ * accessed at any time after engine_init, regardless if the incremental
+ * engine has updated it or not. The incremental engine is responsible for
+ * managing the memory (i.e., allocate/destroy hashtables and maps).
+ */
+struct engine_storage {
+    struct shash pending_ct_zones;
+    struct simap ct_zones;
+
+    struct ovn_desired_flow_table flow_table;
+    struct ovn_extend_table group_table;
+    struct ovn_extend_table meter_table;
+    struct lflow_resource_ref lflow_resource_ref;
+};
+
 struct local_datapath *
 get_local_datapath(const struct hmap *local_datapaths, uint32_t tunnel_key)
 {
@@ -739,7 +754,7 @@  struct ed_type_ofctrl_is_connected {
 };
 
 static void
-en_ofctrl_is_connected_init(struct engine_node *node)
+en_ofctrl_is_connected_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_ofctrl_is_connected *data =
         (struct ed_type_ofctrl_is_connected *)node->data;
@@ -758,10 +773,10 @@  en_ofctrl_is_connected_run(struct engine_node *node)
         (struct ed_type_ofctrl_is_connected *)node->data;
     if (data->connected != ofctrl_is_connected()) {
         data->connected = !data->connected;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_addr_sets {
@@ -773,7 +788,7 @@  struct ed_type_addr_sets {
 };
 
 static void
-en_addr_sets_init(struct engine_node *node)
+en_addr_sets_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
     shash_init(&as->addr_sets);
@@ -811,7 +826,7 @@  en_addr_sets_run(struct engine_node *node)
     addr_sets_init(as_table, &as->addr_sets);
 
     as->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -830,11 +845,14 @@  addr_sets_sb_address_set_handler(struct engine_node *node)
     addr_sets_update(as_table, &as->addr_sets, &as->new,
                      &as->deleted, &as->updated);
 
-    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
-                    || !sset_is_empty(&as->updated);
+    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
+            !sset_is_empty(&as->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     as->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -847,7 +865,7 @@  struct ed_type_port_groups{
 };
 
 static void
-en_port_groups_init(struct engine_node *node)
+en_port_groups_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
     shash_init(&pg->port_groups);
@@ -885,7 +903,7 @@  en_port_groups_run(struct engine_node *node)
     port_groups_init(pg_table, &pg->port_groups);
 
     pg->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -904,11 +922,14 @@  port_groups_sb_port_group_handler(struct engine_node *node)
     port_groups_update(pg_table, &pg->port_groups, &pg->new,
                      &pg->deleted, &pg->updated);
 
-    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
-                    || !sset_is_empty(&pg->updated);
+    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
+            !sset_is_empty(&pg->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     pg->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -930,13 +951,15 @@  struct ed_type_runtime_data {
 
     /* connection tracking zones. */
     unsigned long ct_zone_bitmap[BITMAP_N_LONGS(MAX_CT_ZONES)];
-    struct shash pending_ct_zones;
-    struct simap ct_zones;
+    struct shash *pending_ct_zones;
+    struct simap *ct_zones;
 };
 
 static void
-en_runtime_data_init(struct engine_node *node)
+en_runtime_data_init(struct engine_node *node, void *arg)
 {
+    struct engine_storage *storage = arg;
+
     struct ed_type_runtime_data *data =
         (struct ed_type_runtime_data *)node->data;
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -949,14 +972,18 @@  en_runtime_data_init(struct engine_node *node)
     sset_init(&data->local_lports);
     sset_init(&data->local_lport_ids);
     sset_init(&data->active_tunnels);
-    shash_init(&data->pending_ct_zones);
-    simap_init(&data->ct_zones);
+
+    data->pending_ct_zones = &storage->pending_ct_zones;
+    data->ct_zones         = &storage->ct_zones;
+
+    shash_init(data->pending_ct_zones);
+    simap_init(data->ct_zones);
 
     /* Initialize connection tracking zones. */
     memset(data->ct_zone_bitmap, 0, sizeof data->ct_zone_bitmap);
     bitmap_set1(data->ct_zone_bitmap, 0); /* Zone 0 is reserved. */
     restore_ct_zones(bridge_table, ovs_table,
-                     &data->ct_zones, data->ct_zone_bitmap);
+                     data->ct_zones, data->ct_zone_bitmap);
 }
 
 static void
@@ -978,8 +1005,8 @@  en_runtime_data_cleanup(struct engine_node *node)
     }
     hmap_destroy(&data->local_datapaths);
 
-    simap_destroy(&data->ct_zones);
-    shash_destroy(&data->pending_ct_zones);
+    simap_destroy(data->ct_zones);
+    shash_destroy(data->pending_ct_zones);
 }
 
 static void
@@ -992,8 +1019,8 @@  en_runtime_data_run(struct engine_node *node)
     struct sset *local_lport_ids = &data->local_lport_ids;
     struct sset *active_tunnels = &data->active_tunnels;
     unsigned long *ct_zone_bitmap = data->ct_zone_bitmap;
-    struct shash *pending_ct_zones = &data->pending_ct_zones;
-    struct simap *ct_zones = &data->ct_zones;
+    struct shash *pending_ct_zones = data->pending_ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     static bool first_run = true;
     if (first_run) {
@@ -1091,7 +1118,7 @@  en_runtime_data_run(struct engine_node *node)
     update_ct_zones(local_lports, local_datapaths, ct_zones,
                     ct_zone_bitmap, pending_ct_zones);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1137,7 +1164,7 @@  struct ed_type_mff_ovn_geneve {
 };
 
 static void
-en_mff_ovn_geneve_init(struct engine_node *node)
+en_mff_ovn_geneve_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_mff_ovn_geneve *data =
         (struct ed_type_mff_ovn_geneve *)node->data;
@@ -1157,35 +1184,43 @@  en_mff_ovn_geneve_run(struct engine_node *node)
     enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
     if (data->mff_ovn_geneve != mff_ovn_geneve) {
         data->mff_ovn_geneve = mff_ovn_geneve;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_flow_output {
     /* desired flows */
-    struct ovn_desired_flow_table flow_table;
+    struct ovn_desired_flow_table *flow_table;
     /* group ids for load balancing */
-    struct ovn_extend_table group_table;
+    struct ovn_extend_table *group_table;
     /* meter ids for QoS */
-    struct ovn_extend_table meter_table;
+    struct ovn_extend_table *meter_table;
+    /* lflow resource cross reference */
+    struct lflow_resource_ref *lflow_resource_ref;
+
     /* conjunction id offset */
     uint32_t conj_id_ofs;
-    /* lflow resource cross reference */
-    struct lflow_resource_ref lflow_resource_ref;
 };
 
 static void
-en_flow_output_init(struct engine_node *node)
+en_flow_output_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_flow_output *data =
         (struct ed_type_flow_output *)node->data;
-    ovn_desired_flow_table_init(&data->flow_table);
-    ovn_extend_table_init(&data->group_table);
-    ovn_extend_table_init(&data->meter_table);
+    struct engine_storage *storage = arg;
+
+    data->flow_table         = &storage->flow_table;
+    data->group_table        = &storage->group_table;
+    data->meter_table        = &storage->meter_table;
+    data->lflow_resource_ref = &storage->lflow_resource_ref;
+
+    ovn_desired_flow_table_init(data->flow_table);
+    ovn_extend_table_init(data->group_table);
+    ovn_extend_table_init(data->meter_table);
     data->conj_id_ofs = 1;
-    lflow_resource_init(&data->lflow_resource_ref);
+    lflow_resource_init(data->lflow_resource_ref);
 }
 
 static void
@@ -1193,10 +1228,10 @@  en_flow_output_cleanup(struct engine_node *node)
 {
     struct ed_type_flow_output *data =
         (struct ed_type_flow_output *)node->data;
-    ovn_desired_flow_table_destroy(&data->flow_table);
-    ovn_extend_table_destroy(&data->group_table);
-    ovn_extend_table_destroy(&data->meter_table);
-    lflow_resource_destroy(&data->lflow_resource_ref);
+    ovn_desired_flow_table_destroy(data->flow_table);
+    ovn_extend_table_destroy(data->group_table);
+    ovn_extend_table_destroy(data->meter_table);
+    lflow_resource_destroy(data->lflow_resource_ref);
 }
 
 static void
@@ -1209,7 +1244,7 @@  en_flow_output_run(struct engine_node *node)
     struct sset *local_lports = &rt_data->local_lports;
     struct sset *local_lport_ids = &rt_data->local_lport_ids;
     struct sset *active_tunnels = &rt_data->active_tunnels;
-    struct simap *ct_zones = &rt_data->ct_zones;
+    struct simap *ct_zones = rt_data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1247,11 +1282,11 @@  en_flow_output_run(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     static bool first_run = true;
     if (first_run) {
@@ -1322,7 +1357,7 @@  en_flow_output_run(struct engine_node *node)
                  active_tunnels,
                  flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1366,11 +1401,11 @@  flow_output_sb_logical_flow_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath =
         engine_ovsdb_node_get_index(
@@ -1404,7 +1439,7 @@  flow_output_sb_logical_flow_handler(struct engine_node *node)
               flow_table, group_table, meter_table, lfrr,
               conj_id_ofs);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return handled;
 }
 
@@ -1422,12 +1457,12 @@  flow_output_sb_mac_binding_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
             mac_binding_table, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1439,7 +1474,7 @@  flow_output_sb_port_binding_handler(struct engine_node *node)
                 "runtime_data", node)->data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *active_tunnels = &data->active_tunnels;
-    struct simap *ct_zones = &data->ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1467,7 +1502,7 @@  flow_output_sb_port_binding_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     struct ovsdb_idl_index *sbrec_port_binding_by_name =
         engine_ovsdb_node_get_index(
@@ -1531,7 +1566,7 @@  flow_output_sb_port_binding_handler(struct engine_node *node)
             chassis, ct_zones, local_datapaths,
             active_tunnels, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1542,7 +1577,7 @@  flow_output_sb_multicast_group_handler(struct engine_node *node)
         (struct ed_type_runtime_data *)engine_get_input(
                 "runtime_data", node)->data;
     struct hmap *local_datapaths = &data->local_datapaths;
-    struct simap *ct_zones = &data->ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1570,7 +1605,7 @@  flow_output_sb_multicast_group_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     struct sbrec_multicast_group_table *multicast_group_table =
         (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
@@ -1580,7 +1615,7 @@  flow_output_sb_multicast_group_handler(struct engine_node *node)
             mff_ovn_geneve, chassis, ct_zones, local_datapaths,
             flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 
 }
@@ -1627,11 +1662,11 @@  _flow_output_resource_ref_handler(struct engine_node *node,
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath =
         engine_ovsdb_node_get_index(
@@ -1694,7 +1729,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, updated) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1707,7 +1744,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, new) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1720,7 +1759,9 @@  _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
 
     return true;
@@ -1922,28 +1963,33 @@  main(int argc, char *argv[])
     engine_add_input(&en_runtime_data, &en_sb_port_binding,
                      runtime_data_sb_port_binding_handler);
 
-    engine_init(&en_flow_output);
+    /* Storage space for engine data that can be accessed at any moment
+     * after engine_init(). */
+    struct engine_storage storage;
+
+    /* Initialize the incremental engine and point it to the global engine
+     * data. The engine takes ownership of the 'storage' memory and makes
+     * sure it's initialized and destroyed properly.
+     */
+    engine_init(&en_flow_output, &storage);
 
-    ofctrl_init(&ed_flow_output.group_table,
-                &ed_flow_output.meter_table,
+    ofctrl_init(&storage.group_table, &storage.meter_table,
                 get_ofctrl_probe_interval(ovs_idl_loop.idl));
 
     unixctl_command_register("group-table-list", "", 0, 0,
-                             group_table_list, &ed_flow_output.group_table);
+                             group_table_list, &storage.group_table);
 
     unixctl_command_register("meter-table-list", "", 0, 0,
-                             meter_table_list, &ed_flow_output.meter_table);
+                             meter_table_list, &storage.meter_table);
 
     unixctl_command_register("ct-zone-list", "", 0, 0,
-                             ct_zone_list, &ed_runtime_data.ct_zones);
+                             ct_zone_list, &storage.ct_zones);
 
     struct pending_pkt pending_pkt = { .conn = NULL };
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
     uint64_t engine_run_id = 0;
-    uint64_t old_engine_run_id = 0;
-    bool engine_run_done = true;
 
     unsigned int ovs_cond_seqno = UINT_MAX;
     unsigned int ovnsb_cond_seqno = UINT_MAX;
@@ -1952,10 +1998,11 @@  main(int argc, char *argv[])
     exiting = false;
     restart = false;
     while (!exiting) {
+        engine_run_id++;
+
         update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
         update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
         ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl));
-        old_engine_run_id = engine_run_id;
 
         struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
         unsigned int new_ovs_cond_seqno
@@ -2011,7 +2058,7 @@  main(int argc, char *argv[])
             }
 
             if (br_int) {
-                ofctrl_run(br_int, &ed_runtime_data.pending_ct_zones);
+                ofctrl_run(br_int, &storage.pending_ct_zones);
 
                 if (chassis) {
                     patch_run(ovs_idl_txn,
@@ -2044,50 +2091,66 @@  main(int argc, char *argv[])
                              * this round of engine_run and continue processing
                              * acculated changes incrementally later when
                              * ofctrl_can_put() returns true. */
-                            if (engine_run_done) {
+                            if (!engine_aborted(&en_flow_output)) {
                                 engine_set_abort_recompute(true);
-                                engine_run_done = engine_run(&en_flow_output,
-                                                             ++engine_run_id);
+                                engine_run(&en_flow_output, engine_run_id);
                             }
                         } else {
                             engine_set_abort_recompute(false);
-                            engine_run_done = true;
-                            engine_run(&en_flow_output, ++engine_run_id);
+                            engine_run(&en_flow_output, engine_run_id);
                         }
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
                                    time_msec());
                     if (ovs_idl_txn) {
-                        commit_ct_zones(br_int,
-                                        &ed_runtime_data.pending_ct_zones);
+                        commit_ct_zones(br_int, &storage.pending_ct_zones);
                         bfd_run(ovsrec_interface_table_get(ovs_idl_loop.idl),
                                 br_int, chassis,
                                 sbrec_ha_chassis_group_table_get(
                                     ovnsb_idl_loop.idl),
                                 sbrec_sb_global_table_get(ovnsb_idl_loop.idl));
                     }
-                    ofctrl_put(&ed_flow_output.flow_table,
-                               &ed_runtime_data.pending_ct_zones,
-                               sbrec_meter_table_get(ovnsb_idl_loop.idl),
-                               get_nb_cfg(sbrec_sb_global_table_get(
-                                              ovnsb_idl_loop.idl)),
-                               en_flow_output.changed);
-                    pinctrl_run(ovnsb_idl_txn,
-                                sbrec_datapath_binding_by_key,
-                                sbrec_port_binding_by_datapath,
-                                sbrec_port_binding_by_key,
-                                sbrec_port_binding_by_name,
-                                sbrec_mac_binding_by_lport_ip,
-                                sbrec_igmp_group,
-                                sbrec_ip_multicast,
-                                sbrec_dns_table_get(ovnsb_idl_loop.idl),
-                                sbrec_controller_event_table_get(
-                                    ovnsb_idl_loop.idl),
-                                br_int, chassis,
-                                &ed_runtime_data.local_datapaths,
-                                &ed_runtime_data.active_tunnels);
 
-                    if (en_runtime_data.changed) {
+                    /* We need to make sure the en_flow_output node was
+                     * properly computed before trying to install OF flows.
+                     */
+                    if (engine_node_valid(&en_flow_output, engine_run_id)) {
+                        ofctrl_put(&storage.flow_table,
+                                   &storage.pending_ct_zones,
+                                   sbrec_meter_table_get(ovnsb_idl_loop.idl),
+                                   get_nb_cfg(sbrec_sb_global_table_get(
+                                                   ovnsb_idl_loop.idl)),
+                                   engine_node_changed(&en_flow_output,
+                                                       engine_run_id));
+                    }
+
+                    /* We need to make sure that at least the runtime data
+                     * (e.g., local datapaths, local lports) are fresh before
+                     * calling pinctrl_run to avoid using potentially freed
+                     * references to database records.
+                     */
+                    if (engine_node_valid(&en_runtime_data, engine_run_id)) {
+                        pinctrl_run(ovnsb_idl_txn,
+                                    sbrec_datapath_binding_by_key,
+                                    sbrec_port_binding_by_datapath,
+                                    sbrec_port_binding_by_key,
+                                    sbrec_port_binding_by_name,
+                                    sbrec_mac_binding_by_lport_ip,
+                                    sbrec_igmp_group,
+                                    sbrec_ip_multicast,
+                                    sbrec_dns_table_get(ovnsb_idl_loop.idl),
+                                    sbrec_controller_event_table_get(
+                                        ovnsb_idl_loop.idl),
+                                    br_int, chassis,
+                                    &ed_runtime_data.local_datapaths,
+                                    &ed_runtime_data.active_tunnels);
+                    }
+
+                    /* If the runtime data changed we might need to update
+                     * the tables we need to monitor from the SB DB.
+                     */
+                    if (engine_node_changed(&en_runtime_data,
+                                            engine_run_id)) {
                         update_sb_monitors(ovnsb_idl_loop.idl, chassis,
                                            &ed_runtime_data.local_lports,
                                            &ed_runtime_data.local_datapaths);
@@ -2095,17 +2158,20 @@  main(int argc, char *argv[])
                 }
 
             }
-            if (old_engine_run_id == engine_run_id || !engine_run_done) {
-                if (!engine_run_done || engine_need_run(&en_flow_output)) {
-                    VLOG_DBG("engine did not run, force recompute next time: "
-                             "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
-                    poll_immediate_wake();
-                } else {
-                    VLOG_DBG("engine did not run, and it was not needed"
-                             " either: br_int %p, chassis %p",
-                             br_int, chassis);
-                }
+            if (engine_need_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, force recompute next time: "
+                            "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (engine_aborted(&en_flow_output)) {
+                VLOG_DBG("engine was aborted, force recompute next time: "
+                         "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, and it was not needed"
+                         " either: br_int %p, chassis %p",
+                         br_int, chassis);
             } else {
                 engine_set_force_recompute(false);
             }
@@ -2117,9 +2183,14 @@  main(int argc, char *argv[])
                 }
             }
 
-
             if (pending_pkt.conn) {
-                if (br_int && chassis) {
+                /* We need to make sure that en_addr_sets and en_port_group
+                 * nodes contain valid data before trying to inject the
+                 * packet.
+                 */
+                if (br_int && chassis &&
+                        engine_node_valid(&en_addr_sets, engine_run_id) &&
+                        engine_node_valid(&en_port_groups, engine_run_id)) {
                     char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
                         &ed_addr_sets.addr_sets, &ed_port_groups.port_groups);
                     if (error) {
@@ -2161,11 +2232,10 @@  main(int argc, char *argv[])
 
         if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
             struct shash_node *iter, *iter_next;
-            SHASH_FOR_EACH_SAFE (iter, iter_next,
-                                 &ed_runtime_data.pending_ct_zones) {
+            SHASH_FOR_EACH_SAFE (iter, iter_next, &storage.pending_ct_zones) {
                 struct ct_zone_pending_entry *ctzpe = iter->data;
                 if (ctzpe->state == CT_ZONE_DB_SENT) {
-                    shash_delete(&ed_runtime_data.pending_ct_zones, iter);
+                    shash_delete(&storage.pending_ct_zones, iter);
                     free(ctzpe);
                 }
             }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 1064a08..cbb9c39 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -34,6 +34,15 @@  static bool engine_force_recompute = false;
 static bool engine_abort_recompute = false;
 static const struct engine_context *engine_context;
 
+static const char *engine_node_state_name[EN_STATE_MAX] = {
+    [EN_NEW]       = "New",
+    [EN_STALE]     = "Stale",
+    [EN_UPDATED]   = "Updated",
+    [EN_VALID]     = "Valid",
+    [EN_ABORTED]   = "Aborted",
+    [EN_DESTROYED] = "Destroyed",
+};
+
 void
 engine_set_force_recompute(bool val)
 {
@@ -59,19 +68,35 @@  engine_set_context(const struct engine_context *ctx)
 }
 
 void
-engine_init(struct engine_node *node)
+engine_init(struct engine_node *node, void *arg)
 {
+    if (!engine_node_new(node)) {
+        /* The node was already initialized (could be input for multiple
+         * nodes). Nothing to do then.
+         */
+        return;
+    }
+
+    engine_set_node_state(node, EN_STALE);
     for (size_t i = 0; i < node->n_inputs; i++) {
-        engine_init(node->inputs[i].node);
+        engine_init(node->inputs[i].node, arg);
     }
     if (node->init) {
-        node->init(node);
+        node->init(node, arg);
     }
 }
 
 void
 engine_cleanup(struct engine_node *node)
 {
+    /* The neode was already destroyed (could be input for multiple nodes).
+     * Nothing to do then.
+     */
+    if (engine_node_destroyed(node)) {
+        return;
+    }
+
+    engine_set_node_state(node, EN_DESTROYED);
     for (size_t i = 0; i < node->n_inputs; i++) {
         engine_cleanup(node->inputs[i].node);
     }
@@ -128,89 +153,197 @@  engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
     ed->n_indexes ++;
 }
 
+void
+engine_set_node_state_at(struct engine_node *node,
+                         enum engine_node_state state,
+                         const char *where)
+{
+    if (node->state == state) {
+        return;
+    }
+
+    VLOG_DBG("%s: node: %s (run-id %lu), old_state %s, new_state %s",
+             where, node->name, node->run_id,
+             engine_node_state_name[node->state],
+             engine_node_state_name[state]);
+
+    node->state = state;
+}
+
 bool
+engine_node_new(struct engine_node *node)
+{
+    return node->state == EN_NEW;
+}
+
+bool
+engine_node_destroyed(struct engine_node *node)
+{
+    return node->state == EN_DESTROYED;
+}
+
+bool
+engine_node_valid(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id &&
+        (node->state == EN_UPDATED || node->state == EN_VALID);
+}
+
+bool
+engine_node_changed(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id && node->state == EN_UPDATED;
+}
+
+bool
+engine_has_run(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id;
+}
+
+bool
+engine_aborted(struct engine_node *node)
+{
+    return node->state == EN_ABORTED;
+}
+
+/* Do a full recompute (or at least try). If we're not allowed then
+ * mark the node as "aborted".
+ */
+static void
+engine_recompute(struct engine_node *node, bool forced, bool allowed)
+{
+    VLOG_DBG("node: %s, recompute (%s)", node->name,
+             forced ? "forced" : "triggered");
+
+    if (!allowed) {
+        VLOG_DBG("node: %s, recompute aborted", node->name);
+        engine_set_node_state(node, EN_ABORTED);
+        return;
+    }
+
+    /* Run the node handler which might change state. */
+    node->run(node);
+}
+
+/* Return true if the node could be computed without triggerring a full
+ * recompute.
+ */
+static bool
+engine_compute(struct engine_node *node, bool recompute_allowed)
+{
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        /* If the input node data changed call its change handler. */
+        if (node->inputs[i].node->state == EN_UPDATED) {
+            VLOG_DBG("node: %s, handle change for input %s",
+                     node->name, node->inputs[i].node->name);
+
+            /* If the input change can't be handled incrementally, run
+             * the node handler.
+             */
+            if (!node->inputs[i].change_handler(node)) {
+                VLOG_DBG("node: %s, can't handle change for input %s, "
+                         "fall back to recompute",
+                         node->name, node->inputs[i].node->name);
+                engine_recompute(node, false, recompute_allowed);
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
+void
 engine_run(struct engine_node *node, uint64_t run_id)
 {
     if (node->run_id == run_id) {
-        return true;
+        /* The node was already updated in this run (could be input for
+         * multiple other nodes). Stop processing.
+         */
+        return;
     }
+
+    /* Initialize the node for this run. */
     node->run_id = run_id;
+    engine_set_node_state(node, EN_STALE);
 
-    node->changed = false;
     if (!node->n_inputs) {
+        /* Run the node handler which might change state. */
         node->run(node);
-        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-        return true;
+        return;
     }
 
     for (size_t i = 0; i < node->n_inputs; i++) {
-        if (!engine_run(node->inputs[i].node, run_id)) {
-            return false;
+        engine_run(node->inputs[i].node, run_id);
+        if (!engine_node_valid(node->inputs[i].node, run_id)) {
+            /* If the input node aborted computation, move to EN_ABORTED to
+             * propagate the result, otherwise stay in EN_STALE.
+             */
+            if (engine_aborted(node->inputs[i].node)) {
+                engine_set_node_state(node, EN_ABORTED);
+            }
+            return;
         }
     }
 
     bool need_compute = false;
-    bool need_recompute = false;
 
     if (engine_force_recompute) {
-        need_recompute = true;
-    } else {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                need_compute = true;
-                if (!node->inputs[i].change_handler) {
-                    need_recompute = true;
-                    break;
-                }
+        engine_recompute(node, true, !engine_abort_recompute);
+        return;
+    }
+
+    /* If one of the inputs updated data then we need to recompute the
+     * current node too.
+     */
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        if (node->inputs[i].node->state == EN_UPDATED) {
+            need_compute = true;
+
+            /* Trigger a recompute if we don't have a change handler. */
+            if (!node->inputs[i].change_handler) {
+                engine_recompute(node, false, !engine_abort_recompute);
+                return;
             }
         }
     }
 
-    if (need_recompute) {
-        VLOG_DBG("node: %s, recompute (%s)", node->name,
-                 engine_force_recompute ? "forced" : "triggered");
-        if (engine_abort_recompute) {
-            VLOG_DBG("node: %s, recompute aborted", node->name);
-            return false;
-        }
-        node->run(node);
-    } else if (need_compute) {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                VLOG_DBG("node: %s, handle change for input %s",
-                         node->name, node->inputs[i].node->name);
-                if (!node->inputs[i].change_handler(node)) {
-                    VLOG_DBG("node: %s, can't handle change for input %s, "
-                             "fall back to recompute",
-                             node->name, node->inputs[i].node->name);
-                    if (engine_abort_recompute) {
-                        VLOG_DBG("node: %s, recompute aborted", node->name);
-                        return false;
-                    }
-                    node->run(node);
-                    break;
-                }
-            }
+    if (need_compute) {
+        /* If we coudln't compute the node we either aborted or triggered
+         * a full recompute. In any case, stop processing.
+         */
+        if (!engine_compute(node, !engine_abort_recompute)) {
+            return;
         }
     }
 
-    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-    return true;
+    /* If we reached this point, either the node was updated or its state is
+     * still valid.
+     */
+    if (!engine_node_changed(node, run_id)) {
+        engine_set_node_state(node, EN_VALID);
+    }
 }
 
 bool
-engine_need_run(struct engine_node *node)
+engine_need_run(struct engine_node *node, uint64_t run_id)
 {
     size_t i;
 
+    if (node->run_id == run_id) {
+        return false;
+    }
+
     if (!node->n_inputs) {
         node->run(node);
-        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
-        return node->changed;
+        VLOG_DBG("input node: %s, state: %s", node->name,
+                 engine_node_state_name[node->state]);
+        return node->state == EN_UPDATED;
     }
 
     for (i = 0; i < node->n_inputs; i++) {
-        if (engine_need_run(node->inputs[i].node)) {
+        if (engine_need_run(node->inputs[i].node, run_id)) {
             return true;
         }
     }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 3a69dc2..43ef82a 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -82,6 +82,22 @@  struct engine_node_input {
     bool (*change_handler)(struct engine_node *node);
 };
 
+enum engine_node_state {
+    EN_NEW,       /* Node is not initialized yet. */
+    EN_STALE,     /* Data in the node is not up to date with the DB. */
+    EN_UPDATED,   /* Data in the node is valid but was updated during the
+                   * last run.
+                   */
+    EN_VALID,     /* Data in the node is valid and didn't change during the
+                   * last run.
+                   */
+    EN_ABORTED,   /* During the last run, processing was aborted for
+                   * this node.
+                   */
+    EN_DESTROYED, /* The node was cleaned up. */
+    EN_STATE_MAX,
+};
+
 struct engine_node {
     /* A unique id to distinguish each iteration of the engine_run(). */
     uint64_t run_id;
@@ -102,11 +118,11 @@  struct engine_node {
      * node. */
     void *data;
 
-    /* Whether the data changed in the last engine run. */
-    bool changed;
+    /* State of the node after the last engine run. */
+    enum engine_node_state state;
 
     /* Method to initialize data. It may be NULL. */
-    void (*init)(struct engine_node *);
+    void (*init)(struct engine_node *, void *arg);
 
     /* Method to clean up data. It may be NULL. */
     void (*cleanup)(struct engine_node *);
@@ -117,22 +133,24 @@  struct engine_node {
 };
 
 /* Initialize the data for the engine nodes recursively. It calls each node's
- * init() method if not NULL. It should be called before the main loop. */
-void engine_init(struct engine_node *);
+ * init() method if not NULL. It should be called before the main loop.
+ * 'arg' is user provided and is passed to all of the node's init handlers.
+ */
+void engine_init(struct engine_node *, void *arg);
 
 /* Execute the processing recursively, which should be called in the main
- * loop. Returns true if the execution is compelte, false if it is aborted,
- * which could happen when engine_abort_recompute is set. */
-bool engine_run(struct engine_node *, uint64_t run_id);
+ * loop. Updates the engine node's states accordingly.
+ */
+void engine_run(struct engine_node *, uint64_t run_id);
 
 /* Clean up the data for the engine nodes recursively. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
 void engine_cleanup(struct engine_node *);
 
-/* Check if engine needs to run, i.e. any change to be processed. */
+/* Check if engine needs to run but didn't. */
 bool
-engine_need_run(struct engine_node *);
+engine_need_run(struct engine_node *, uint64_t run_id);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -159,6 +177,32 @@  const struct engine_context * engine_get_context(void);
 
 void engine_set_context(const struct engine_context *);
 
+void engine_set_node_state_at(struct engine_node *node,
+                              enum engine_node_state state,
+                              const char *where);
+
+/* Return true if the node is "new" (i.e., uninitialized). */
+bool engine_node_new(struct engine_node *node);
+
+/* Return true if the node was already destroyed. */
+bool engine_node_destroyed(struct engine_node *node);
+
+/* Return true if the node's data is up to date with the database contents. */
+bool engine_node_valid(struct engine_node *node, uint64_t run_id);
+
+/* Return true if during the 'run_id' iteration the node's data was updated. */
+bool engine_node_changed(struct engine_node *node, uint64_t run_id);
+
+/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
+bool engine_has_run(struct engine_node *node, uint64_t run_id);
+
+/* Returns true if during the last engine run we had to abort processing. */
+bool engine_aborted(struct engine_node *node);
+
+/* Set the state of the node and log changes. */
+#define engine_set_node_state(node, state) \
+    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
+
 struct ed_ovsdb_index {
     const char *name;
     struct ovsdb_idl_index *index;
@@ -184,6 +228,7 @@  void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
     struct engine_node en_##NAME = { \
         .name = NAME_STR, \
         .data = &ed_##NAME, \
+        .state = EN_NEW, \
         .init = en_##NAME##_init, \
         .run = en_##NAME##_run, \
         .cleanup = en_##NAME##_cleanup, \
@@ -198,12 +243,13 @@  en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
     const struct DB_NAME##rec_##TBL_NAME##_table *table = \
         EN_OVSDB_GET(node); \
     if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
-        node->changed = true; \
+        engine_set_node_state(node, EN_UPDATED); \
         return; \
     } \
-    node->changed = false; \
+    engine_set_node_state(node, EN_VALID); \
 } \
-static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
+static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node, \
+                                                void *arg) \
             = NULL; \
 static void (*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node *node) \
             = NULL;