diff mbox series

[ovs-dev,v8,1/4] ovn-controller: Split logical flow and physical flow processing.

Message ID 20210531034506.1072096-1-numans@ovn.org
State Changes Requested
Headers show
Series ovn-controller: Split logical flow and physical flow processing | expand

Commit Message

Numan Siddique May 31, 2021, 3:45 a.m. UTC
From: Numan Siddique <numans@ovn.org>

Presently, the 'flow_output' engine node recomputes physical
flows by calling physical_run() in the 'physical_flow_changes'
handler in some scenarios.  Because of this, an engine run can
do a full recompute of physical flows but not full recompute
of logical flows.  Although this works now, it is problematic
as the same desired flow table is used for both physical and
logical flows.

This patch now separates the handling of logical flows and
physical flows and removes the 'physical_flow_changes' engine
node.  Two separate engine nodes are added - lflow_output and
pflow_output with their own flow tables and these two nodes are
now inputs to the main engine node - flow_output.  This separation
reflects the data dependency more clearly.

CC: Han Zhou <hzhou@ovn.org>
Signed-off-by: Numan Siddique <numans@ovn.org>
---
 TODO.rst                    |   6 +
 controller/ofctrl.c         |  99 +++--
 controller/ofctrl.h         |   6 +-
 controller/ovn-controller.c | 712 ++++++++++++++++++------------------
 controller/physical.c       |  19 -
 controller/physical.h       |   4 -
 6 files changed, 429 insertions(+), 417 deletions(-)

Comments

Numan Siddique June 2, 2021, 12:10 a.m. UTC | #1
On Sun, May 30, 2021 at 11:45 PM <numans@ovn.org> wrote:
>
> From: Numan Siddique <numans@ovn.org>
>
> Presently, the 'flow_output' engine node recomputes physical
> flows by calling physical_run() in the 'physical_flow_changes'
> handler in some scenarios.  Because of this, an engine run can
> do a full recompute of physical flows but not full recompute
> of logical flows.  Although this works now, it is problematic
> as the same desired flow table is used for both physical and
> logical flows.
>
> This patch now separates the handling of logical flows and
> physical flows and removes the 'physical_flow_changes' engine
> node.  Two separate engine nodes are added - lflow_output and
> pflow_output with their own flow tables and these two nodes are
> now inputs to the main engine node - flow_output.  This separation
> reflects the data dependency more clearly.
>
> CC: Han Zhou <hzhou@ovn.org>
> Signed-off-by: Numan Siddique <numans@ovn.org>

Hi Han,

Gentle ping.  Wondering if you got the chance to take a look at the
first patch of the series.

It would be great if the first patch can be considered before we
branch (or before the 21.06 release).

Thanks
Numan

> ---
>  TODO.rst                    |   6 +
>  controller/ofctrl.c         |  99 +++--
>  controller/ofctrl.h         |   6 +-
>  controller/ovn-controller.c | 712 ++++++++++++++++++------------------
>  controller/physical.c       |  19 -
>  controller/physical.h       |   4 -
>  6 files changed, 429 insertions(+), 417 deletions(-)
>
> diff --git a/TODO.rst b/TODO.rst
> index c89fe203e1..618ea4844a 100644
> --- a/TODO.rst
> +++ b/TODO.rst
> @@ -164,3 +164,9 @@ OVN To-do List
>      to find a way of determining if routing has already been executed (on a
>      different hypervisor) for the IP multicast packet being processed locally
>      in the router pipeline.
> +
> +* ovn-controller Incremental processing
> +
> +  * physical.c has a global simap -localvif_to_ofport which stores the
> +    local OVS interfaces and the ofport numbers. Move this to the engine data
> +    of the engine data node - ed_type_pflow_output.
> diff --git a/controller/ofctrl.c b/controller/ofctrl.c
> index c29c3d1805..053631590b 100644
> --- a/controller/ofctrl.c
> +++ b/controller/ofctrl.c
> @@ -173,7 +173,7 @@ struct sb_flow_ref {
>      struct uuid sb_uuid;
>  };
>
> -/* A installed flow, in static variable installed_flows.
> +/* An installed flow, in static variable installed_lflows/installed_pflows.
>   *
>   * Installed flows are updated in ofctrl_put for maintaining the flow
>   * installation to OVS. They are updated according to desired flows: either by
> @@ -234,7 +234,7 @@ static struct desired_flow *desired_flow_lookup_conjunctive(
>  static void desired_flow_destroy(struct desired_flow *);
>
>  static struct installed_flow *installed_flow_lookup(
> -    const struct ovn_flow *target);
> +    const struct ovn_flow *target, struct hmap *installed_flows);
>  static void installed_flow_destroy(struct installed_flow *);
>  static struct installed_flow *installed_flow_dup(struct desired_flow *);
>  static struct desired_flow *installed_flow_get_active(struct installed_flow *);
> @@ -302,9 +302,12 @@ static ovs_be32 xid, xid2;
>   * zero, to avoid unbounded buffering. */
>  static struct rconn_packet_counter *tx_counter;
>
> -/* Flow table of "struct ovn_flow"s, that holds the flow table currently
> - * installed in the switch. */
> -static struct hmap installed_flows;
> +/* Flow table of "struct ovn_flow"s, that holds the logical flow table
> + * currently installed in the switch. */
> +static struct hmap installed_lflows;
> +/* Flow table of "struct ovn_flow"s, that holds the physical flow table
> + * currently installed in the switch. */
> +static struct hmap installed_pflows;
>
>  /* A reference to the group_table. */
>  static struct ovn_extend_table *groups;
> @@ -343,7 +346,8 @@ ofctrl_init(struct ovn_extend_table *group_table,
>      swconn = rconn_create(inactivity_probe_interval, 0,
>                            DSCP_DEFAULT, 1 << OFP15_VERSION);
>      tx_counter = rconn_packet_counter_create();
> -    hmap_init(&installed_flows);
> +    hmap_init(&installed_lflows);
> +    hmap_init(&installed_pflows);
>      ovs_list_init(&flow_updates);
>      ovn_init_symtab(&symtab);
>      groups = group_table;
> @@ -1426,11 +1430,12 @@ desired_flow_lookup_conjunctive(struct ovn_desired_flow_table *flow_table,
>  /* Finds and returns an installed_flow in installed_flows whose key is
>   * identical to 'target''s key, or NULL if there is none. */
>  static struct installed_flow *
> -installed_flow_lookup(const struct ovn_flow *target)
> +installed_flow_lookup(const struct ovn_flow *target,
> +                      struct hmap *installed_flows)
>  {
>      struct installed_flow *i;
>      HMAP_FOR_EACH_WITH_HASH (i, match_hmap_node, target->hash,
> -                             &installed_flows) {
> +                             installed_flows) {
>          struct ovn_flow *f = &i->flow;
>          if (f->table_id == target->table_id
>              && f->priority == target->priority
> @@ -1542,8 +1547,14 @@ static void
>  ovn_installed_flow_table_clear(void)
>  {
>      struct installed_flow *f, *next;
> -    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_flows) {
> -        hmap_remove(&installed_flows, &f->match_hmap_node);
> +    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_lflows) {
> +        hmap_remove(&installed_lflows, &f->match_hmap_node);
> +        unlink_all_refs_for_installed_flow(f);
> +        installed_flow_destroy(f);
> +    }
> +
> +    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_pflows) {
> +        hmap_remove(&installed_pflows, &f->match_hmap_node);
>          unlink_all_refs_for_installed_flow(f);
>          installed_flow_destroy(f);
>      }
> @@ -1553,7 +1564,8 @@ static void
>  ovn_installed_flow_table_destroy(void)
>  {
>      ovn_installed_flow_table_clear();
> -    hmap_destroy(&installed_flows);
> +    hmap_destroy(&installed_lflows);
> +    hmap_destroy(&installed_pflows);
>  }
>
>  /* Flow table update. */
> @@ -1829,6 +1841,7 @@ installed_flow_del(struct ovn_flow *i,
>  static void
>  update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
>                                    struct ofputil_bundle_ctrl_msg *bc,
> +                                  struct hmap *installed_flows,
>                                    struct ovs_list *msgs)
>  {
>      ovs_assert(ovs_list_is_empty(&flow_table->tracked_flows));
> @@ -1836,7 +1849,7 @@ update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
>       * longer desired, delete them; if any of them should have different
>       * actions, update them. */
>      struct installed_flow *i, *next;
> -    HMAP_FOR_EACH_SAFE (i, next, match_hmap_node, &installed_flows) {
> +    HMAP_FOR_EACH_SAFE (i, next, match_hmap_node, installed_flows) {
>          unlink_all_refs_for_installed_flow(i);
>          struct desired_flow *d = desired_flow_lookup(flow_table, &i->flow);
>          if (!d) {
> @@ -1845,7 +1858,7 @@ update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
>              installed_flow_del(&i->flow, bc, msgs);
>              ovn_flow_log(&i->flow, "removing installed");
>
> -            hmap_remove(&installed_flows, &i->match_hmap_node);
> +            hmap_remove(installed_flows, &i->match_hmap_node);
>              installed_flow_destroy(i);
>          } else {
>              if (!ofpacts_equal(i->flow.ofpacts, i->flow.ofpacts_len,
> @@ -1863,14 +1876,14 @@ update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
>       * in the installed flow table. */
>      struct desired_flow *d;
>      HMAP_FOR_EACH (d, match_hmap_node, &flow_table->match_flow_table) {
> -        i = installed_flow_lookup(&d->flow);
> +        i = installed_flow_lookup(&d->flow, installed_flows);
>          if (!i) {
>              ovn_flow_log(&d->flow, "adding installed");
>              installed_flow_add(&d->flow, bc, msgs);
>
>              /* Copy 'd' from 'flow_table' to installed_flows. */
>              i = installed_flow_dup(d);
> -            hmap_insert(&installed_flows, &i->match_hmap_node, i->flow.hash);
> +            hmap_insert(installed_flows, &i->match_hmap_node, i->flow.hash);
>              link_installed_to_desired(i, d);
>          } else if (!d->installed_flow) {
>              /* This is a desired_flow that conflicts with one installed
> @@ -1961,6 +1974,7 @@ merge_tracked_flows(struct ovn_desired_flow_table *flow_table)
>  static void
>  update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
>                                  struct ofputil_bundle_ctrl_msg *bc,
> +                                struct hmap *installed_flows,
>                                  struct ovs_list *msgs)
>  {
>      merge_tracked_flows(flow_table);
> @@ -1979,7 +1993,7 @@ update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
>                      installed_flow_del(&i->flow, bc, msgs);
>                      ovn_flow_log(&i->flow, "removing installed (tracked)");
>
> -                    hmap_remove(&installed_flows, &i->match_hmap_node);
> +                    hmap_remove(installed_flows, &i->match_hmap_node);
>                      installed_flow_destroy(i);
>                  } else if (was_active) {
>                      /* There are other desired flow(s) referencing this
> @@ -1993,7 +2007,8 @@ update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
>              desired_flow_destroy(f);
>          } else {
>              /* The desired flow was added or modified. */
> -            struct installed_flow *i = installed_flow_lookup(&f->flow);
> +            struct installed_flow *i = installed_flow_lookup(&f->flow,
> +                                                             installed_flows);
>              if (!i) {
>                  /* Adding a new flow. */
>                  installed_flow_add(&f->flow, bc, msgs);
> @@ -2001,7 +2016,7 @@ update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
>
>                  /* Copy 'f' from 'flow_table' to installed_flows. */
>                  struct installed_flow *new_node = installed_flow_dup(f);
> -                hmap_insert(&installed_flows, &new_node->match_hmap_node,
> +                hmap_insert(installed_flows, &new_node->match_hmap_node,
>                              new_node->flow.hash);
>                  link_installed_to_desired(new_node, f);
>              } else if (installed_flow_get_active(i) == f) {
> @@ -2055,16 +2070,19 @@ ofctrl_can_put(void)
>   *
>   * This should be called after ofctrl_run() within the main loop. */
>  void
> -ofctrl_put(struct ovn_desired_flow_table *flow_table,
> +ofctrl_put(struct ovn_desired_flow_table *lflow_table,
> +           struct ovn_desired_flow_table *pflow_table,
>             struct shash *pending_ct_zones,
>             const struct sbrec_meter_table *meter_table,
>             uint64_t req_cfg,
> -           bool flow_changed)
> +           bool lflows_changed,
> +           bool pflows_changed)
>  {
>      static bool skipped_last_time = false;
>      static uint64_t old_req_cfg = 0;
>      bool need_put = false;
> -    if (flow_changed || skipped_last_time || need_reinstall_flows) {
> +    if (lflows_changed || pflows_changed || skipped_last_time ||
> +        need_reinstall_flows) {
>          need_put = true;
>          old_req_cfg = req_cfg;
>      } else if (req_cfg != old_req_cfg) {
> @@ -2093,7 +2111,6 @@ ofctrl_put(struct ovn_desired_flow_table *flow_table,
>          return;
>      }
>
> -    skipped_last_time = false;
>      need_reinstall_flows = false;
>
>      /* OpenFlow messages to send to the switch to bring it up-to-date. */
> @@ -2159,12 +2176,35 @@ ofctrl_put(struct ovn_desired_flow_table *flow_table,
>      bundle_open = ofputil_encode_bundle_ctrl_request(OFP15_VERSION, &bc);
>      ovs_list_push_back(&msgs, &bundle_open->list_node);
>
> -    if (flow_table->change_tracked) {
> -        update_installed_flows_by_track(flow_table, &bc, &msgs);
> -    } else {
> -        update_installed_flows_by_compare(flow_table, &bc, &msgs);
> +    /* If skipped last time, then process the flow table
> +     * (tracked) flows even if lflows_changed is not set.
> +     * Same for pflows_changed. */
> +    if (lflows_changed || skipped_last_time) {
> +        if (lflow_table->change_tracked) {
> +            update_installed_flows_by_track(lflow_table, &bc,
> +                                            &installed_lflows,
> +                                            &msgs);
> +        } else {
> +            update_installed_flows_by_compare(lflow_table, &bc,
> +                                              &installed_lflows,
> +                                              &msgs);
> +        }
> +    }
> +
> +    if (pflows_changed || skipped_last_time) {
> +        if (pflow_table->change_tracked) {
> +            update_installed_flows_by_track(pflow_table, &bc,
> +                                            &installed_pflows,
> +                                            &msgs);
> +        } else {
> +            update_installed_flows_by_compare(pflow_table, &bc,
> +                                              &installed_pflows,
> +                                              &msgs);
> +        }
>      }
>
> +    skipped_last_time = false;
> +
>      if (ovs_list_back(&msgs) == &bundle_open->list_node) {
>          /* No flow updates.  Removing the bundle open request. */
>          ovs_list_pop_back(&msgs);
> @@ -2287,8 +2327,11 @@ ofctrl_put(struct ovn_desired_flow_table *flow_table,
>          cur_cfg = req_cfg;
>      }
>
> -    flow_table->change_tracked = true;
> -    ovs_assert(ovs_list_is_empty(&flow_table->tracked_flows));
> +    lflow_table->change_tracked = true;
> +    ovs_assert(ovs_list_is_empty(&lflow_table->tracked_flows));
> +
> +    pflow_table->change_tracked = true;
> +    ovs_assert(ovs_list_is_empty(&pflow_table->tracked_flows));
>  }
>
>  /* Looks up the logical port with the name 'port_name' in 'br_int_'.  If
> diff --git a/controller/ofctrl.h b/controller/ofctrl.h
> index 88769566ac..ead8088c5b 100644
> --- a/controller/ofctrl.h
> +++ b/controller/ofctrl.h
> @@ -52,11 +52,13 @@ void ofctrl_init(struct ovn_extend_table *group_table,
>  void ofctrl_run(const struct ovsrec_bridge *br_int,
>                  struct shash *pending_ct_zones);
>  enum mf_field_id ofctrl_get_mf_field_id(void);
> -void ofctrl_put(struct ovn_desired_flow_table *,
> +void ofctrl_put(struct ovn_desired_flow_table *lflow_table,
> +                struct ovn_desired_flow_table *pflow_table,
>                  struct shash *pending_ct_zones,
>                  const struct sbrec_meter_table *,
>                  uint64_t nb_cfg,
> -                bool flow_changed);
> +                bool lflow_changed,
> +                bool pflow_changed);
>  bool ofctrl_can_put(void);
>  void ofctrl_wait(void);
>  void ofctrl_destroy(void);
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index d48ddc7a27..e3051189b1 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -563,7 +563,7 @@ add_pending_ct_zone_entry(struct shash *pending_ct_zones,
>  static void
>  update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
>                  struct simap *ct_zones, unsigned long *ct_zone_bitmap,
> -                struct shash *pending_ct_zones, struct hmapx *updated_dps)
> +                struct shash *pending_ct_zones)
>  {
>      struct simap_node *ct_zone, *ct_zone_next;
>      int scan_start = 1;
> @@ -653,11 +653,6 @@ update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
>
>          bitmap_set1(ct_zone_bitmap, snat_req_node->data);
>          simap_put(ct_zones, snat_req_node->name, snat_req_node->data);
> -        struct shash_node *ld_node = shash_find(&all_lds, snat_req_node->name);
> -        if (ld_node) {
> -            struct local_datapath *dp = ld_node->data;
> -            hmapx_add(updated_dps, (void *) dp->datapath);
> -        }
>      }
>
>      /* xxx This is wasteful to assign a zone to each port--even if no
> @@ -686,12 +681,6 @@ update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
>
>          bitmap_set1(ct_zone_bitmap, zone);
>          simap_put(ct_zones, user, zone);
> -
> -        struct shash_node *ld_node = shash_find(&all_lds, user);
> -        if (ld_node) {
> -            struct local_datapath *dp = ld_node->data;
> -            hmapx_add(updated_dps, (void *) dp->datapath);
> -        }
>      }
>
>      simap_destroy(&req_snat_zones);
> @@ -983,9 +972,6 @@ struct ed_type_runtime_data {
>      bool tracked;
>      bool local_lports_changed;
>      struct hmap tracked_dp_bindings;
> -
> -    /* CT zone data. Contains datapaths that had updated CT zones */
> -    struct hmapx ct_updated_datapaths;
>  };
>
>  /* struct ed_type_runtime_data has the below members for tracking the
> @@ -1077,8 +1063,6 @@ en_runtime_data_init(struct engine_node *node OVS_UNUSED,
>      /* Init the tracked data. */
>      hmap_init(&data->tracked_dp_bindings);
>
> -    hmapx_init(&data->ct_updated_datapaths);
> -
>      return data;
>  }
>
> @@ -1101,7 +1085,6 @@ en_runtime_data_cleanup(void *data)
>      }
>      hmap_destroy(&rt_data->local_datapaths);
>      local_binding_data_destroy(&rt_data->lbinding_data);
> -    hmapx_destroy(&rt_data->ct_updated_datapaths);
>  }
>
>  static void
> @@ -1224,7 +1207,6 @@ en_runtime_data_run(struct engine_node *node, void *data)
>          sset_init(&rt_data->egress_ifaces);
>          smap_init(&rt_data->local_iface_ids);
>          local_binding_data_init(&rt_data->lbinding_data);
> -        hmapx_clear(&rt_data->ct_updated_datapaths);
>      }
>
>      struct binding_ctx_in b_ctx_in;
> @@ -1744,10 +1726,9 @@ en_ct_zones_run(struct engine_node *node, void *data)
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
>
> -    hmapx_clear(&rt_data->ct_updated_datapaths);
>      update_ct_zones(&rt_data->local_lports, &rt_data->local_datapaths,
>                      &ct_zones_data->current, ct_zones_data->bitmap,
> -                    &ct_zones_data->pending, &rt_data->ct_updated_datapaths);
> +                    &ct_zones_data->pending);
>
>
>      engine_set_node_state(node, EN_UPDATED);
> @@ -1790,107 +1771,13 @@ en_mff_ovn_geneve_run(struct engine_node *node, void *data)
>      engine_set_node_state(node, EN_UNCHANGED);
>  }
>
> -/* Engine node en_physical_flow_changes indicates whether
> - * there is a need to
> - *   - recompute only physical flows or
> - *   - we can incrementally process the physical flows.
> - *
> - * en_physical_flow_changes is an input to flow_output engine node.
> - * If the engine node 'en_physical_flow_changes' gets updated during
> - * engine run, it means the handler for this -
> - * flow_output_physical_flow_changes_handler() will either
> - *    - recompute the physical flows by calling 'physical_run() or
> - *    - incrementlly process some of the changes for physical flow
> - *      calculation. Right now we handle OVS interfaces changes
> - *      for physical flow computation.
> - *
> - * When ever a port binding happens, the follow up
> - * activity is the zone id allocation for that port binding.
> - * With this intermediate engine node, we avoid full recomputation.
> - * Instead we do physical flow computation (either full recomputation
> - * by calling physical_run() or handling the changes incrementally.
> - *
> - * Hence this is an intermediate engine node to indicate the
> - * flow_output engine to recomputes/compute the physical flows.
> - *
> - * TODO 1. Ideally this engine node should recompute/compute the physical
> - *         flows instead of relegating it to the flow_output node.
> - *         But this requires splitting the flow_output node to
> - *         logical_flow_output and physical_flow_output.
> - *
> - * TODO 2. We can further optimise the en_ct_zone changes to
> - *         compute the phsyical flows for changed zone ids.
> - *
> - * TODO 3: physical.c has a global simap -localvif_to_ofport which stores the
> - *         local OVS interfaces and the ofport numbers. Ideally this should be
> - *         part of the engine data.
> - */
> -struct ed_type_pfc_data {
> -    /* Both these variables are tracked and set in each engine run. */
> -    bool recompute_physical_flows;
> -    bool ovs_ifaces_changed;
> -};
> -
> -static void
> -en_physical_flow_changes_clear_tracked_data(void *data_)
> -{
> -    struct ed_type_pfc_data *data = data_;
> -    data->recompute_physical_flows = false;
> -    data->ovs_ifaces_changed = false;
> -}
> -
> -static void *
> -en_physical_flow_changes_init(struct engine_node *node OVS_UNUSED,
> -                              struct engine_arg *arg OVS_UNUSED)
> -{
> -    struct ed_type_pfc_data *data = xzalloc(sizeof *data);
> -    return data;
> -}
> -
> -static void
> -en_physical_flow_changes_cleanup(void *data OVS_UNUSED)
> -{
> -}
> -
> -/* Indicate to the flow_output engine that we need to recompute physical
> - * flows. */
> -static void
> -en_physical_flow_changes_run(struct engine_node *node, void *data)
> -{
> -    struct ed_type_pfc_data *pfc_tdata = data;
> -    pfc_tdata->recompute_physical_flows = true;
> -    pfc_tdata->ovs_ifaces_changed = true;
> -    engine_set_node_state(node, EN_UPDATED);
> -}
> -
> -/* ct_zone changes are not handled incrementally but a handler is required
> - * to avoid skipping the ovs_iface incremental change handler.
> - */
> -static bool
> -physical_flow_changes_ct_zones_handler(struct engine_node *node OVS_UNUSED,
> -                                       void *data OVS_UNUSED)
> -{
> -    return false;
> -}
> -
> -/* There are OVS interface changes. Indicate to the flow_output engine
> - * to handle these OVS interface changes for physical flow computations. */
> -static bool
> -physical_flow_changes_ovs_iface_handler(struct engine_node *node, void *data)
> -{
> -    struct ed_type_pfc_data *pfc_tdata = data;
> -    pfc_tdata->ovs_ifaces_changed = true;
> -    engine_set_node_state(node, EN_UPDATED);
> -    return true;
> -}
> -
> -struct flow_output_persistent_data {
> +struct lflow_output_persistent_data {
>      uint32_t conj_id_ofs;
>      struct lflow_cache *lflow_cache;
>  };
>
> -struct ed_type_flow_output {
> -    /* desired flows */
> +struct ed_type_lflow_output {
> +    /* Logical flow table */
>      struct ovn_desired_flow_table flow_table;
>      /* group ids for load balancing */
>      struct ovn_extend_table group_table;
> @@ -1901,81 +1788,15 @@ struct ed_type_flow_output {
>
>      /* Data which is persistent and not cleared during
>       * full recompute. */
> -    struct flow_output_persistent_data pd;
> +    struct lflow_output_persistent_data pd;
>  };
>
> -static void init_physical_ctx(struct engine_node *node,
> -                              struct ed_type_runtime_data *rt_data,
> -                              struct physical_ctx *p_ctx)
> -{
> -    struct ovsdb_idl_index *sbrec_port_binding_by_name =
> -        engine_ovsdb_node_get_index(
> -                engine_get_input("SB_port_binding", node),
> -                "name");
> -
> -    struct sbrec_multicast_group_table *multicast_group_table =
> -        (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
> -            engine_get_input("SB_multicast_group", node));
> -
> -    struct sbrec_port_binding_table *port_binding_table =
> -        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
> -            engine_get_input("SB_port_binding", node));
> -
> -    struct sbrec_chassis_table *chassis_table =
> -        (struct sbrec_chassis_table *)EN_OVSDB_GET(
> -            engine_get_input("SB_chassis", node));
> -
> -    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> -        engine_get_input_data("mff_ovn_geneve", node);
> -
> -    struct ovsrec_open_vswitch_table *ovs_table =
> -        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
> -            engine_get_input("OVS_open_vswitch", node));
> -    struct ovsrec_bridge_table *bridge_table =
> -        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
> -            engine_get_input("OVS_bridge", node));
> -    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
> -    const char *chassis_id = get_ovs_chassis_id(ovs_table);
> -    const struct sbrec_chassis *chassis = NULL;
> -    struct ovsdb_idl_index *sbrec_chassis_by_name =
> -        engine_ovsdb_node_get_index(
> -                engine_get_input("SB_chassis", node),
> -                "name");
> -    if (chassis_id) {
> -        chassis = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
> -    }
> -
> -    ovs_assert(br_int && chassis);
> -
> -    struct ovsrec_interface_table *iface_table =
> -        (struct ovsrec_interface_table *)EN_OVSDB_GET(
> -            engine_get_input("OVS_interface", node));
> -
> -    struct ed_type_ct_zones *ct_zones_data =
> -        engine_get_input_data("ct_zones", node);
> -    struct simap *ct_zones = &ct_zones_data->current;
> -
> -    p_ctx->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
> -    p_ctx->port_binding_table = port_binding_table;
> -    p_ctx->mc_group_table = multicast_group_table;
> -    p_ctx->br_int = br_int;
> -    p_ctx->chassis_table = chassis_table;
> -    p_ctx->iface_table = iface_table;
> -    p_ctx->chassis = chassis;
> -    p_ctx->active_tunnels = &rt_data->active_tunnels;
> -    p_ctx->local_datapaths = &rt_data->local_datapaths;
> -    p_ctx->local_lports = &rt_data->local_lports;
> -    p_ctx->ct_zones = ct_zones;
> -    p_ctx->mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
> -    p_ctx->local_bindings = &rt_data->lbinding_data.bindings;
> -    p_ctx->ct_updated_datapaths = &rt_data->ct_updated_datapaths;
> -}
> -
> -static void init_lflow_ctx(struct engine_node *node,
> -                           struct ed_type_runtime_data *rt_data,
> -                           struct ed_type_flow_output *fo,
> -                           struct lflow_ctx_in *l_ctx_in,
> -                           struct lflow_ctx_out *l_ctx_out)
> +static void
> +init_lflow_ctx(struct engine_node *node,
> +               struct ed_type_runtime_data *rt_data,
> +               struct ed_type_lflow_output *fo,
> +               struct lflow_ctx_in *l_ctx_in,
> +               struct lflow_ctx_out *l_ctx_out)
>  {
>      struct ovsdb_idl_index *sbrec_port_binding_by_name =
>          engine_ovsdb_node_get_index(
> @@ -2085,11 +1906,10 @@ static void init_lflow_ctx(struct engine_node *node,
>  }
>
>  static void *
> -en_flow_output_init(struct engine_node *node OVS_UNUSED,
> -                    struct engine_arg *arg OVS_UNUSED)
> +en_lflow_output_init(struct engine_node *node OVS_UNUSED,
> +                     struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_flow_output *data = xzalloc(sizeof *data);
> -
> +    struct ed_type_lflow_output *data = xzalloc(sizeof *data);
>      ovn_desired_flow_table_init(&data->flow_table);
>      ovn_extend_table_init(&data->group_table);
>      ovn_extend_table_init(&data->meter_table);
> @@ -2099,9 +1919,9 @@ en_flow_output_init(struct engine_node *node OVS_UNUSED,
>  }
>
>  static void
> -en_flow_output_cleanup(void *data)
> +en_lflow_output_cleanup(void *data)
>  {
> -    struct ed_type_flow_output *flow_output_data = data;
> +    struct ed_type_lflow_output *flow_output_data = data;
>      ovn_desired_flow_table_destroy(&flow_output_data->flow_table);
>      ovn_extend_table_destroy(&flow_output_data->group_table);
>      ovn_extend_table_destroy(&flow_output_data->meter_table);
> @@ -2110,7 +1930,7 @@ en_flow_output_cleanup(void *data)
>  }
>
>  static void
> -en_flow_output_run(struct engine_node *node, void *data)
> +en_lflow_output_run(struct engine_node *node, void *data)
>  {
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
> @@ -2136,8 +1956,8 @@ en_flow_output_run(struct engine_node *node, void *data)
>
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo = data;
> -    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> +    struct ed_type_lflow_output *fo = data;
> +    struct ovn_desired_flow_table *lflow_table = &fo->flow_table;
>      struct ovn_extend_table *group_table = &fo->group_table;
>      struct ovn_extend_table *meter_table = &fo->meter_table;
>      struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
> @@ -2146,7 +1966,7 @@ en_flow_output_run(struct engine_node *node, void *data)
>      if (first_run) {
>          first_run = false;
>      } else {
> -        ovn_desired_flow_table_clear(flow_table);
> +        ovn_desired_flow_table_clear(lflow_table);
>          ovn_extend_table_clear(group_table, false /* desired */);
>          ovn_extend_table_clear(meter_table, false /* desired */);
>          lflow_resource_clear(lfrr);
> @@ -2168,7 +1988,7 @@ en_flow_output_run(struct engine_node *node, void *data)
>      if (l_ctx_out.conj_id_overflow) {
>          /* Conjunction ids overflow. There can be many holes in between.
>           * Destroy lflow cache and call lflow_run() again. */
> -        ovn_desired_flow_table_clear(flow_table);
> +        ovn_desired_flow_table_clear(lflow_table);
>          ovn_extend_table_clear(group_table, false /* desired */);
>          ovn_extend_table_clear(meter_table, false /* desired */);
>          lflow_resource_clear(lfrr);
> @@ -2181,16 +2001,11 @@ en_flow_output_run(struct engine_node *node, void *data)
>          }
>      }
>
> -    struct physical_ctx p_ctx;
> -    init_physical_ctx(node, rt_data, &p_ctx);
> -
> -    physical_run(&p_ctx, &fo->flow_table);
> -
>      engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> -flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
> +lflow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
>  {
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
> @@ -2203,7 +2018,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
>      const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
>      ovs_assert(br_int);
>
> -    struct ed_type_flow_output *fo = data;
> +    struct ed_type_lflow_output *fo = data;
>      struct lflow_ctx_in l_ctx_in;
>      struct lflow_ctx_out l_ctx_out;
>      init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
> @@ -2215,7 +2030,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
>  }
>
>  static bool
> -flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
> +lflow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
>  {
>      struct ovsdb_idl_index *sbrec_port_binding_by_name =
>          engine_ovsdb_node_get_index(
> @@ -2230,60 +2045,17 @@ flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
>          engine_get_input_data("runtime_data", node);
>      const struct hmap *local_datapaths = &rt_data->local_datapaths;
>
> -    struct ed_type_flow_output *fo = data;
> -    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> +    struct ed_type_lflow_output *lfo = data;
>
>      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> -            mac_binding_table, local_datapaths, flow_table);
> +            mac_binding_table, local_datapaths, &lfo->flow_table);
>
>      engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
>  static bool
> -flow_output_sb_port_binding_handler(struct engine_node *node,
> -                                    void *data)
> -{
> -    struct ed_type_runtime_data *rt_data =
> -        engine_get_input_data("runtime_data", node);
> -
> -    struct ed_type_flow_output *fo = data;
> -    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> -
> -    struct physical_ctx p_ctx;
> -    init_physical_ctx(node, rt_data, &p_ctx);
> -
> -    /* We handle port-binding changes for physical flow processing
> -     * only. flow_output runtime data handler takes care of processing
> -     * logical flows for any port binding changes.
> -     */
> -    physical_handle_port_binding_changes(&p_ctx, flow_table);
> -
> -    engine_set_node_state(node, EN_UPDATED);
> -    return true;
> -}
> -
> -static bool
> -flow_output_sb_multicast_group_handler(struct engine_node *node, void *data)
> -{
> -    struct ed_type_runtime_data *rt_data =
> -        engine_get_input_data("runtime_data", node);
> -
> -    struct ed_type_flow_output *fo = data;
> -    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> -
> -    struct physical_ctx p_ctx;
> -    init_physical_ctx(node, rt_data, &p_ctx);
> -
> -    physical_handle_mc_group_changes(&p_ctx, flow_table);
> -
> -    engine_set_node_state(node, EN_UPDATED);
> -    return true;
> -
> -}
> -
> -static bool
> -_flow_output_resource_ref_handler(struct engine_node *node, void *data,
> +_lflow_output_resource_ref_handler(struct engine_node *node, void *data,
>                                    enum ref_type ref_type)
>  {
>      struct ed_type_runtime_data *rt_data =
> @@ -2315,7 +2087,7 @@ _flow_output_resource_ref_handler(struct engine_node *node, void *data,
>
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo = data;
> +    struct ed_type_lflow_output *fo = data;
>
>      struct lflow_ctx_in l_ctx_in;
>      struct lflow_ctx_out l_ctx_out;
> @@ -2384,53 +2156,20 @@ _flow_output_resource_ref_handler(struct engine_node *node, void *data,
>  }
>
>  static bool
> -flow_output_addr_sets_handler(struct engine_node *node, void *data)
> +lflow_output_addr_sets_handler(struct engine_node *node, void *data)
>  {
> -    return _flow_output_resource_ref_handler(node, data, REF_TYPE_ADDRSET);
> +    return _lflow_output_resource_ref_handler(node, data, REF_TYPE_ADDRSET);
>  }
>
>  static bool
> -flow_output_port_groups_handler(struct engine_node *node, void *data)
> +lflow_output_port_groups_handler(struct engine_node *node, void *data)
>  {
> -    return _flow_output_resource_ref_handler(node, data, REF_TYPE_PORTGROUP);
> -}
> -
> -static bool
> -flow_output_physical_flow_changes_handler(struct engine_node *node, void *data)
> -{
> -    struct ed_type_runtime_data *rt_data =
> -        engine_get_input_data("runtime_data", node);
> -
> -    struct ed_type_flow_output *fo = data;
> -    struct physical_ctx p_ctx;
> -    init_physical_ctx(node, rt_data, &p_ctx);
> -
> -    engine_set_node_state(node, EN_UPDATED);
> -    struct ed_type_pfc_data *pfc_data =
> -        engine_get_input_data("physical_flow_changes", node);
> -
> -    /* If there are OVS interface changes. Try to handle them incrementally. */
> -    if (pfc_data->ovs_ifaces_changed) {
> -        if (!physical_handle_ovs_iface_changes(&p_ctx, &fo->flow_table)) {
> -            return false;
> -        }
> -    }
> -
> -    if (pfc_data->recompute_physical_flows) {
> -        /* This indicates that we need to recompute the physical flows. */
> -        physical_clear_unassoc_flows_with_db(&fo->flow_table);
> -        physical_clear_dp_flows(&p_ctx, &rt_data->ct_updated_datapaths,
> -                                &fo->flow_table);
> -        physical_run(&p_ctx, &fo->flow_table);
> -        return true;
> -    }
> -
> -    return true;
> +    return _lflow_output_resource_ref_handler(node, data, REF_TYPE_PORTGROUP);
>  }
>
>  static bool
> -flow_output_runtime_data_handler(struct engine_node *node,
> -                                 void *data OVS_UNUSED)
> +lflow_output_runtime_data_handler(struct engine_node *node,
> +                                  void *data OVS_UNUSED)
>  {
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
> @@ -2451,12 +2190,9 @@ flow_output_runtime_data_handler(struct engine_node *node,
>
>      struct lflow_ctx_in l_ctx_in;
>      struct lflow_ctx_out l_ctx_out;
> -    struct ed_type_flow_output *fo = data;
> +    struct ed_type_lflow_output *fo = data;
>      init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
>
> -    struct physical_ctx p_ctx;
> -    init_physical_ctx(node, rt_data, &p_ctx);
> -
>      struct tracked_binding_datapath *tdp;
>      HMAP_FOR_EACH (tdp, node, tracked_dp_bindings) {
>          if (tdp->is_new) {
> @@ -2481,12 +2217,12 @@ flow_output_runtime_data_handler(struct engine_node *node,
>  }
>
>  static bool
> -flow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
> +lflow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
>  {
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
>
> -    struct ed_type_flow_output *fo = data;
> +    struct ed_type_lflow_output *fo = data;
>      struct lflow_ctx_in l_ctx_in;
>      struct lflow_ctx_out l_ctx_out;
>      init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
> @@ -2498,12 +2234,12 @@ flow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
>  }
>
>  static bool
> -flow_output_sb_fdb_handler(struct engine_node *node, void *data)
> +lflow_output_sb_fdb_handler(struct engine_node *node, void *data)
>  {
>      struct ed_type_runtime_data *rt_data =
>          engine_get_input_data("runtime_data", node);
>
> -    struct ed_type_flow_output *fo = data;
> +    struct ed_type_lflow_output *fo = data;
>      struct lflow_ctx_in l_ctx_in;
>      struct lflow_ctx_out l_ctx_out;
>      init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
> @@ -2514,6 +2250,230 @@ flow_output_sb_fdb_handler(struct engine_node *node, void *data)
>      return handled;
>  }
>
> +struct ed_type_pflow_output {
> +    /* Desired physical flows. */
> +    struct ovn_desired_flow_table flow_table;
> +};
> +
> +static void init_physical_ctx(struct engine_node *node,
> +                              struct ed_type_runtime_data *rt_data,
> +                              struct physical_ctx *p_ctx)
> +{
> +    struct ovsdb_idl_index *sbrec_port_binding_by_name =
> +        engine_ovsdb_node_get_index(
> +                engine_get_input("SB_port_binding", node),
> +                "name");
> +
> +    struct sbrec_multicast_group_table *multicast_group_table =
> +        (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
> +            engine_get_input("SB_multicast_group", node));
> +
> +    struct sbrec_port_binding_table *port_binding_table =
> +        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
> +            engine_get_input("SB_port_binding", node));
> +
> +    struct sbrec_chassis_table *chassis_table =
> +        (struct sbrec_chassis_table *)EN_OVSDB_GET(
> +            engine_get_input("SB_chassis", node));
> +
> +    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> +        engine_get_input_data("mff_ovn_geneve", node);
> +
> +    struct ovsrec_open_vswitch_table *ovs_table =
> +        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
> +            engine_get_input("OVS_open_vswitch", node));
> +    struct ovsrec_bridge_table *bridge_table =
> +        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
> +            engine_get_input("OVS_bridge", node));
> +    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
> +    const char *chassis_id = get_ovs_chassis_id(ovs_table);
> +    const struct sbrec_chassis *chassis = NULL;
> +    struct ovsdb_idl_index *sbrec_chassis_by_name =
> +        engine_ovsdb_node_get_index(
> +                engine_get_input("SB_chassis", node),
> +                "name");
> +    if (chassis_id) {
> +        chassis = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
> +    }
> +
> +    ovs_assert(br_int && chassis);
> +
> +    struct ovsrec_interface_table *iface_table =
> +        (struct ovsrec_interface_table *)EN_OVSDB_GET(
> +            engine_get_input("OVS_interface", node));
> +
> +    struct ed_type_ct_zones *ct_zones_data =
> +        engine_get_input_data("ct_zones", node);
> +    struct simap *ct_zones = &ct_zones_data->current;
> +
> +    p_ctx->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
> +    p_ctx->port_binding_table = port_binding_table;
> +    p_ctx->mc_group_table = multicast_group_table;
> +    p_ctx->br_int = br_int;
> +    p_ctx->chassis_table = chassis_table;
> +    p_ctx->iface_table = iface_table;
> +    p_ctx->chassis = chassis;
> +    p_ctx->active_tunnels = &rt_data->active_tunnels;
> +    p_ctx->local_datapaths = &rt_data->local_datapaths;
> +    p_ctx->local_lports = &rt_data->local_lports;
> +    p_ctx->ct_zones = ct_zones;
> +    p_ctx->mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
> +    p_ctx->local_bindings = &rt_data->lbinding_data.bindings;
> +}
> +
> +static void *
> +en_pflow_output_init(struct engine_node *node OVS_UNUSED,
> +                             struct engine_arg *arg OVS_UNUSED)
> +{
> +    struct ed_type_pflow_output *data = xzalloc(sizeof *data);
> +    ovn_desired_flow_table_init(&data->flow_table);
> +    return data;
> +}
> +
> +static void
> +en_pflow_output_cleanup(void *data OVS_UNUSED)
> +{
> +    struct ed_type_pflow_output *pfo = data;
> +    ovn_desired_flow_table_destroy(&pfo->flow_table);
> +}
> +
> +static void
> +en_pflow_output_run(struct engine_node *node, void *data)
> +{
> +    struct ed_type_pflow_output *pfo = data;
> +    struct ovn_desired_flow_table *pflow_table = &pfo->flow_table;
> +    static bool first_run = true;
> +    if (first_run) {
> +        first_run = false;
> +    } else {
> +        ovn_desired_flow_table_clear(pflow_table);
> +    }
> +
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +
> +    struct physical_ctx p_ctx;
> +    init_physical_ctx(node, rt_data, &p_ctx);
> +    physical_run(&p_ctx, pflow_table);
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +}
> +
> +static bool
> +pflow_output_sb_port_binding_handler(struct engine_node *node,
> +                                     void *data)
> +{
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +
> +    struct ed_type_pflow_output *pfo = data;
> +
> +    struct physical_ctx p_ctx;
> +    init_physical_ctx(node, rt_data, &p_ctx);
> +
> +    /* We handle port-binding changes for physical flow processing
> +     * only. flow_output runtime data handler takes care of processing
> +     * logical flows for any port binding changes.
> +     */
> +    physical_handle_port_binding_changes(&p_ctx, &pfo->flow_table);
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
> +static bool
> +pflow_output_sb_multicast_group_handler(struct engine_node *node, void *data)
> +{
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +
> +    struct ed_type_pflow_output *pfo = data;
> +
> +    struct physical_ctx p_ctx;
> +    init_physical_ctx(node, rt_data, &p_ctx);
> +
> +    physical_handle_mc_group_changes(&p_ctx, &pfo->flow_table);
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
> +static bool
> +pflow_output_ovs_iface_handler(struct engine_node *node OVS_UNUSED,
> +                               void *data OVS_UNUSED)
> +{
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +
> +    struct ed_type_pflow_output *pfo = data;
> +
> +    struct physical_ctx p_ctx;
> +    init_physical_ctx(node, rt_data, &p_ctx);
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return physical_handle_ovs_iface_changes(&p_ctx, &pfo->flow_table);
> +}
> +
> +/* Handles sbrec_chassis changes.
> + * If a new chassis is added or removed return false, so that
> + * physical flows are programmed.
> + * For any updates, there is no need for any flow computation.
> + * Encap changes will also result in sbrec_chassis changes,
> + * but we handle encap changes separately.
> + */
> +static bool
> +pflow_output_sb_chassis_handler(struct engine_node *node,
> +                                void *data OVS_UNUSED)
> +{
> +    struct sbrec_chassis_table *chassis_table =
> +        (struct sbrec_chassis_table *)EN_OVSDB_GET(
> +            engine_get_input("SB_chassis", node));
> +
> +    const struct sbrec_chassis *ch;
> +    SBREC_CHASSIS_TABLE_FOR_EACH_TRACKED (ch, chassis_table) {
> +        if (sbrec_chassis_is_deleted(ch) || sbrec_chassis_is_new(ch)) {
> +            return false;
> +        }
> +    }
> +
> +    return true;
> +}
> +
> +static void *
> +en_flow_output_init(struct engine_node *node OVS_UNUSED,
> +                    struct engine_arg *arg OVS_UNUSED)
> +{
> +    return NULL;
> +}
> +
> +static void
> +en_flow_output_cleanup(void *data OVS_UNUSED)
> +{
> +
> +}
> +
> +static void
> +en_flow_output_run(struct engine_node *node OVS_UNUSED, void *data OVS_UNUSED)
> +{
> +    engine_set_node_state(node, EN_UPDATED);
> +}
> +
> +static bool
> +flow_output_pflow_output_handler(struct engine_node *node,
> +                                 void *data OVS_UNUSED)
> +{
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
> +static bool
> +flow_output_lflow_output_handler(struct engine_node *node,
> +                                 void *data OVS_UNUSED)
> +{
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
>  struct ovn_controller_exit_args {
>      bool *exiting;
>      bool *restart;
> @@ -2706,8 +2666,8 @@ main(int argc, char *argv[])
>      ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
>      ENGINE_NODE(mff_ovn_geneve, "mff_ovn_geneve");
>      ENGINE_NODE(ofctrl_is_connected, "ofctrl_is_connected");
> -    ENGINE_NODE_WITH_CLEAR_TRACK_DATA(physical_flow_changes,
> -                                      "physical_flow_changes");
> +    ENGINE_NODE(pflow_output, "physical_flow_output");
> +    ENGINE_NODE(lflow_output, "logical_flow_output");
>      ENGINE_NODE(flow_output, "flow_output");
>      ENGINE_NODE(addr_sets, "addr_sets");
>      ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_groups, "port_groups");
> @@ -2731,58 +2691,71 @@ main(int argc, char *argv[])
>      engine_add_input(&en_port_groups, &en_runtime_data,
>                       port_groups_runtime_data_handler);
>
> -    /* Engine node physical_flow_changes indicates whether
> -     * we can recompute only physical flows or we can
> -     * incrementally process the physical flows.
> -     *
> -     * Note: The order of inputs is important, all OVS interface changes must
> +    /* Note: The order of inputs is important, all OVS interface changes must
>       * be handled before any ct_zone changes.
>       */
> -    engine_add_input(&en_physical_flow_changes, &en_ovs_interface,
> -                     physical_flow_changes_ovs_iface_handler);
> -    engine_add_input(&en_physical_flow_changes, &en_ct_zones,
> -                     physical_flow_changes_ct_zones_handler);
> -
> -    engine_add_input(&en_flow_output, &en_addr_sets,
> -                     flow_output_addr_sets_handler);
> -    engine_add_input(&en_flow_output, &en_port_groups,
> -                     flow_output_port_groups_handler);
> -    engine_add_input(&en_flow_output, &en_runtime_data,
> -                     flow_output_runtime_data_handler);
> -    engine_add_input(&en_flow_output, &en_mff_ovn_geneve, NULL);
> -    engine_add_input(&en_flow_output, &en_physical_flow_changes,
> -                     flow_output_physical_flow_changes_handler);
> +    engine_add_input(&en_pflow_output, &en_ovs_interface,
> +                     pflow_output_ovs_iface_handler);
> +    engine_add_input(&en_pflow_output, &en_ct_zones,
> +                     NULL);
> +    engine_add_input(&en_pflow_output, &en_sb_chassis,
> +                     pflow_output_sb_chassis_handler);
> +    engine_add_input(&en_pflow_output, &en_sb_port_binding,
> +                     pflow_output_sb_port_binding_handler);
> +    engine_add_input(&en_pflow_output, &en_sb_multicast_group,
> +                     pflow_output_sb_multicast_group_handler);
> +
> +    engine_add_input(&en_pflow_output, &en_runtime_data,
> +                     NULL);
> +    engine_add_input(&en_pflow_output, &en_sb_encap, NULL);
> +    engine_add_input(&en_pflow_output, &en_mff_ovn_geneve, NULL);
> +    engine_add_input(&en_pflow_output, &en_ovs_open_vswitch, NULL);
> +    engine_add_input(&en_pflow_output, &en_ovs_bridge, NULL);
> +
> +    engine_add_input(&en_lflow_output, &en_addr_sets,
> +                     lflow_output_addr_sets_handler);
> +    engine_add_input(&en_lflow_output, &en_port_groups,
> +                     lflow_output_port_groups_handler);
> +    engine_add_input(&en_lflow_output, &en_runtime_data,
> +                     lflow_output_runtime_data_handler);
>
>      /* We need this input nodes for only data. Hence the noop handler. */
> -    engine_add_input(&en_flow_output, &en_ct_zones, engine_noop_handler);
> -    engine_add_input(&en_flow_output, &en_ovs_interface, engine_noop_handler);
> -
> -    engine_add_input(&en_flow_output, &en_ovs_open_vswitch, NULL);
> -    engine_add_input(&en_flow_output, &en_ovs_bridge, NULL);
> -
> -    engine_add_input(&en_flow_output, &en_sb_chassis, NULL);
> -    engine_add_input(&en_flow_output, &en_sb_encap, NULL);
> -    engine_add_input(&en_flow_output, &en_sb_multicast_group,
> -                     flow_output_sb_multicast_group_handler);
> -    engine_add_input(&en_flow_output, &en_sb_port_binding,
> -                     flow_output_sb_port_binding_handler);
> -    engine_add_input(&en_flow_output, &en_sb_mac_binding,
> -                     flow_output_sb_mac_binding_handler);
> -    engine_add_input(&en_flow_output, &en_sb_logical_flow,
> -                     flow_output_sb_logical_flow_handler);
> +    engine_add_input(&en_lflow_output, &en_ct_zones,
> +                     engine_noop_handler);
> +    engine_add_input(&en_lflow_output, &en_ovs_interface,
> +                     engine_noop_handler);
> +    engine_add_input(&en_lflow_output, &en_sb_chassis,
> +                     engine_noop_handler);
> +    engine_add_input(&en_lflow_output, &en_sb_multicast_group,
> +                     engine_noop_handler);
> +
> +    /* Any changes to the port binding, need not be handled
> +     * for lflow_outout engine.  We still need sb_port_binding
> +     * as input to access the port binding data in lflow.c and
> +     * hence the noop handler. */
> +    engine_add_input(&en_lflow_output, &en_sb_port_binding,
> +                     engine_noop_handler);
> +
> +    engine_add_input(&en_lflow_output, &en_ovs_open_vswitch, NULL);
> +    engine_add_input(&en_lflow_output, &en_ovs_bridge, NULL);
> +
> +    engine_add_input(&en_lflow_output, &en_sb_mac_binding,
> +                     lflow_output_sb_mac_binding_handler);
> +    engine_add_input(&en_lflow_output, &en_sb_logical_flow,
> +                     lflow_output_sb_logical_flow_handler);
>      /* Using a noop handler since we don't really need any data from datapath
>       * groups or a full recompute.  Update of a datapath group will put
>       * logical flow into the tracked list, so the logical flow handler will
>       * process all changes. */
> -    engine_add_input(&en_flow_output, &en_sb_logical_dp_group,
> +    engine_add_input(&en_lflow_output, &en_sb_logical_dp_group,
>                       engine_noop_handler);
> -    engine_add_input(&en_flow_output, &en_sb_dhcp_options, NULL);
> -    engine_add_input(&en_flow_output, &en_sb_dhcpv6_options, NULL);
> -    engine_add_input(&en_flow_output, &en_sb_dns, NULL);
> -    engine_add_input(&en_flow_output, &en_sb_load_balancer,
> -                     flow_output_sb_load_balancer_handler);
> -    engine_add_input(&en_flow_output, &en_sb_fdb,
> -                     flow_output_sb_fdb_handler);
> +    engine_add_input(&en_lflow_output, &en_sb_dhcp_options, NULL);
> +    engine_add_input(&en_lflow_output, &en_sb_dhcpv6_options, NULL);
> +    engine_add_input(&en_lflow_output, &en_sb_dns, NULL);
> +    engine_add_input(&en_lflow_output, &en_sb_load_balancer,
> +                     lflow_output_sb_load_balancer_handler);
> +    engine_add_input(&en_lflow_output, &en_sb_fdb,
> +                     lflow_output_sb_fdb_handler);
>
>      engine_add_input(&en_ct_zones, &en_ovs_open_vswitch, NULL);
>      engine_add_input(&en_ct_zones, &en_ovs_bridge, NULL);
> @@ -2810,6 +2783,11 @@ main(int argc, char *argv[])
>      engine_add_input(&en_runtime_data, &en_ovs_interface,
>                       runtime_data_ovs_interface_handler);
>
> +    engine_add_input(&en_flow_output, &en_lflow_output,
> +                     flow_output_lflow_output_handler);
> +    engine_add_input(&en_flow_output, &en_pflow_output,
> +                     flow_output_pflow_output_handler);
> +
>      struct engine_arg engine_arg = {
>          .sb_idl = ovnsb_idl_loop.idl,
>          .ovs_idl = ovs_idl_loop.idl,
> @@ -2832,25 +2810,27 @@ main(int argc, char *argv[])
>      engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
>                                  sbrec_datapath_binding_by_key);
>
> -    struct ed_type_flow_output *flow_output_data =
> -        engine_get_internal_data(&en_flow_output);
> +    struct ed_type_lflow_output *lflow_output_data =
> +        engine_get_internal_data(&en_lflow_output);
> +    struct ed_type_lflow_output *pflow_output_data =
> +        engine_get_internal_data(&en_pflow_output);
>      struct ed_type_ct_zones *ct_zones_data =
>          engine_get_internal_data(&en_ct_zones);
>      struct ed_type_runtime_data *runtime_data =
>          engine_get_internal_data(&en_runtime_data);
>
> -    ofctrl_init(&flow_output_data->group_table,
> -                &flow_output_data->meter_table,
> +    ofctrl_init(&lflow_output_data->group_table,
> +                &lflow_output_data->meter_table,
>                  get_ofctrl_probe_interval(ovs_idl_loop.idl));
>      ofctrl_seqno_init();
>
>      unixctl_command_register("group-table-list", "", 0, 0,
>                               extend_table_list,
> -                             &flow_output_data->group_table);
> +                             &lflow_output_data->group_table);
>
>      unixctl_command_register("meter-table-list", "", 0, 0,
>                               extend_table_list,
> -                             &flow_output_data->meter_table);
> +                             &lflow_output_data->meter_table);
>
>      unixctl_command_register("ct-zone-list", "", 0, 0,
>                               ct_zone_list,
> @@ -2864,14 +2844,14 @@ main(int argc, char *argv[])
>                               NULL);
>      unixctl_command_register("lflow-cache/flush", "", 0, 0,
>                               lflow_cache_flush_cmd,
> -                             &flow_output_data->pd);
> +                             &lflow_output_data->pd);
>      /* Keep deprecated 'flush-lflow-cache' command for now. */
>      unixctl_command_register("flush-lflow-cache", "[deprecated]", 0, 0,
>                               lflow_cache_flush_cmd,
> -                             &flow_output_data->pd);
> +                             &lflow_output_data->pd);
>      unixctl_command_register("lflow-cache/show-stats", "", 0, 0,
>                               lflow_cache_show_stats_cmd,
> -                             &flow_output_data->pd);
> +                             &lflow_output_data->pd);
>
>      bool reset_ovnsb_idl_min_index = false;
>      unixctl_command_register("sb-cluster-state-reset", "", 0, 0,
> @@ -3117,13 +3097,17 @@ main(int argc, char *argv[])
>                          runtime_data ? &runtime_data->lbinding_data : NULL;
>                      if_status_mgr_update(if_mgr, binding_data);
>
> -                    flow_output_data = engine_get_data(&en_flow_output);
> -                    if (flow_output_data && ct_zones_data) {
> -                        ofctrl_put(&flow_output_data->flow_table,
> +                    lflow_output_data = engine_get_data(&en_lflow_output);
> +                    pflow_output_data = engine_get_data(&en_pflow_output);
> +                    if (lflow_output_data && pflow_output_data &&
> +                        ct_zones_data) {
> +                        ofctrl_put(&lflow_output_data->flow_table,
> +                                   &pflow_output_data->flow_table,
>                                     &ct_zones_data->pending,
>                                     sbrec_meter_table_get(ovnsb_idl_loop.idl),
>                                     ofctrl_seqno_get_req_cfg(),
> -                                   engine_node_changed(&en_flow_output));
> +                                   engine_node_changed(&en_lflow_output),
> +                                   engine_node_changed(&en_pflow_output));
>                      }
>                      ofctrl_seqno_run(ofctrl_get_cur_cfg());
>                      if_status_mgr_run(if_mgr, binding_data, !ovnsb_idl_txn,
> @@ -3491,7 +3475,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn OVS_UNUSED,
>                        void *arg_)
>  {
>      VLOG_INFO("User triggered lflow cache flush.");
> -    struct flow_output_persistent_data *fo_pd = arg_;
> +    struct lflow_output_persistent_data *fo_pd = arg_;
>      lflow_cache_flush(fo_pd->lflow_cache);
>      fo_pd->conj_id_ofs = 1;
>      engine_set_force_recompute(true);
> @@ -3503,7 +3487,7 @@ static void
>  lflow_cache_show_stats_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>                             const char *argv[] OVS_UNUSED, void *arg_)
>  {
> -    struct flow_output_persistent_data *fo_pd = arg_;
> +    struct lflow_output_persistent_data *fo_pd = arg_;
>      struct lflow_cache *lc = fo_pd->lflow_cache;
>      struct ds ds = DS_EMPTY_INITIALIZER;
>
> diff --git a/controller/physical.c b/controller/physical.c
> index 018e09540c..04259d44a6 100644
> --- a/controller/physical.c
> +++ b/controller/physical.c
> @@ -1953,22 +1953,3 @@ physical_clear_unassoc_flows_with_db(struct ovn_desired_flow_table *flow_table)
>          ofctrl_remove_flows(flow_table, hc_uuid);
>      }
>  }
> -
> -void
> -physical_clear_dp_flows(struct physical_ctx *p_ctx,
> -                        struct hmapx *ct_updated_datapaths,
> -                        struct ovn_desired_flow_table *flow_table)
> -{
> -    const struct sbrec_port_binding *binding;
> -    SBREC_PORT_BINDING_TABLE_FOR_EACH (binding, p_ctx->port_binding_table) {
> -        if (!hmapx_find(ct_updated_datapaths, binding->datapath)) {
> -            continue;
> -        }
> -        const struct sbrec_port_binding *peer =
> -            get_binding_peer(p_ctx->sbrec_port_binding_by_name, binding);
> -        ofctrl_remove_flows(flow_table, &binding->header_.uuid);
> -        if (peer) {
> -            ofctrl_remove_flows(flow_table, &peer->header_.uuid);
> -        }
> -    }
> -}
> diff --git a/controller/physical.h b/controller/physical.h
> index 0bf13f2683..feab41df4c 100644
> --- a/controller/physical.h
> +++ b/controller/physical.h
> @@ -56,16 +56,12 @@ struct physical_ctx {
>      const struct simap *ct_zones;
>      enum mf_field_id mff_ovn_geneve;
>      struct shash *local_bindings;
> -    struct hmapx *ct_updated_datapaths;
>  };
>
>  void physical_register_ovs_idl(struct ovsdb_idl *);
>  void physical_run(struct physical_ctx *,
>                    struct ovn_desired_flow_table *);
>  void physical_clear_unassoc_flows_with_db(struct ovn_desired_flow_table *);
> -void physical_clear_dp_flows(struct physical_ctx *p_ctx,
> -                             struct hmapx *ct_updated_datapaths,
> -                             struct ovn_desired_flow_table *flow_table);
>  void physical_handle_port_binding_changes(struct physical_ctx *,
>                                            struct ovn_desired_flow_table *);
>  void physical_handle_mc_group_changes(struct physical_ctx *,
> --
> 2.31.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Han Zhou June 2, 2021, 6:21 a.m. UTC | #2
On Tue, Jun 1, 2021 at 5:10 PM Numan Siddique <numans@ovn.org> wrote:
>
> On Sun, May 30, 2021 at 11:45 PM <numans@ovn.org> wrote:
> >
> > From: Numan Siddique <numans@ovn.org>
> >
> > Presently, the 'flow_output' engine node recomputes physical
> > flows by calling physical_run() in the 'physical_flow_changes'
> > handler in some scenarios.  Because of this, an engine run can
> > do a full recompute of physical flows but not full recompute
> > of logical flows.  Although this works now, it is problematic
> > as the same desired flow table is used for both physical and
> > logical flows.
> >
> > This patch now separates the handling of logical flows and
> > physical flows and removes the 'physical_flow_changes' engine
> > node.  Two separate engine nodes are added - lflow_output and
> > pflow_output with their own flow tables and these two nodes are
> > now inputs to the main engine node - flow_output.  This separation
> > reflects the data dependency more clearly.
> >
> > CC: Han Zhou <hzhou@ovn.org>
> > Signed-off-by: Numan Siddique <numans@ovn.org>
>
> Hi Han,
>
> Gentle ping.  Wondering if you got the chance to take a look at the
> first patch of the series.
>

Hi Numan,

Thanks for the revision and sorry for the slow response. I see that you
updated some change handlers from noop to NULL for pflow_output, which
makes more sense now. However, I see there are still several noop handlers
for lflow_output node.
When we use a noop handler, it means that the input is used for compute the
output (otherwise it wouldn't be a dependency), but we know that the
related change is handled *indirectly* when handling another input. I think
this should be the only situation we use noop handler, and when using it we
need to document where is the change handled. Please see my comments
inlined for each of the input below.

> It would be great if the first patch can be considered before we
> branch (or before the 21.06 release).
>
I understand this patch has been hanging around for quite long, but since
it is more of a refactoring than feature or bug-fix, I'd in fact prefer
merging it *after* the release, together with the related patches in the
series, because of the experiences we had before for big changes in I-P
(including many of my patches) that caused regressions. When we are
confident enough we could still backport it to 21.06 when necessary. What
do you think?


> > @@ -2731,58 +2691,71 @@ main(int argc, char *argv[])
> >      engine_add_input(&en_port_groups, &en_runtime_data,
> >                       port_groups_runtime_data_handler);
> >
> > -    /* Engine node physical_flow_changes indicates whether
> > -     * we can recompute only physical flows or we can
> > -     * incrementally process the physical flows.
> > -     *
> > -     * Note: The order of inputs is important, all OVS interface
changes must
> > +    /* Note: The order of inputs is important, all OVS interface
changes must
> >       * be handled before any ct_zone changes.
> >       */
> > -    engine_add_input(&en_physical_flow_changes, &en_ovs_interface,
> > -                     physical_flow_changes_ovs_iface_handler);
> > -    engine_add_input(&en_physical_flow_changes, &en_ct_zones,
> > -                     physical_flow_changes_ct_zones_handler);
> > -
> > -    engine_add_input(&en_flow_output, &en_addr_sets,
> > -                     flow_output_addr_sets_handler);
> > -    engine_add_input(&en_flow_output, &en_port_groups,
> > -                     flow_output_port_groups_handler);
> > -    engine_add_input(&en_flow_output, &en_runtime_data,
> > -                     flow_output_runtime_data_handler);
> > -    engine_add_input(&en_flow_output, &en_mff_ovn_geneve, NULL);
> > -    engine_add_input(&en_flow_output, &en_physical_flow_changes,
> > -                     flow_output_physical_flow_changes_handler);
> > +    engine_add_input(&en_pflow_output, &en_ovs_interface,
> > +                     pflow_output_ovs_iface_handler);
> > +    engine_add_input(&en_pflow_output, &en_ct_zones,
> > +                     NULL);
> > +    engine_add_input(&en_pflow_output, &en_sb_chassis,
> > +                     pflow_output_sb_chassis_handler);
> > +    engine_add_input(&en_pflow_output, &en_sb_port_binding,
> > +                     pflow_output_sb_port_binding_handler);
> > +    engine_add_input(&en_pflow_output, &en_sb_multicast_group,
> > +                     pflow_output_sb_multicast_group_handler);
> > +
> > +    engine_add_input(&en_pflow_output, &en_runtime_data,
> > +                     NULL);
> > +    engine_add_input(&en_pflow_output, &en_sb_encap, NULL);
> > +    engine_add_input(&en_pflow_output, &en_mff_ovn_geneve, NULL);
> > +    engine_add_input(&en_pflow_output, &en_ovs_open_vswitch, NULL);
> > +    engine_add_input(&en_pflow_output, &en_ovs_bridge, NULL);
> > +
> > +    engine_add_input(&en_lflow_output, &en_addr_sets,
> > +                     lflow_output_addr_sets_handler);
> > +    engine_add_input(&en_lflow_output, &en_port_groups,
> > +                     lflow_output_port_groups_handler);
> > +    engine_add_input(&en_lflow_output, &en_runtime_data,
> > +                     lflow_output_runtime_data_handler);
> >
> >      /* We need this input nodes for only data. Hence the noop handler.
*/
> > -    engine_add_input(&en_flow_output, &en_ct_zones,
engine_noop_handler);
> > -    engine_add_input(&en_flow_output, &en_ovs_interface,
engine_noop_handler);
> > -
> > -    engine_add_input(&en_flow_output, &en_ovs_open_vswitch, NULL);
> > -    engine_add_input(&en_flow_output, &en_ovs_bridge, NULL);
> > -
> > -    engine_add_input(&en_flow_output, &en_sb_chassis, NULL);
> > -    engine_add_input(&en_flow_output, &en_sb_encap, NULL);
> > -    engine_add_input(&en_flow_output, &en_sb_multicast_group,
> > -                     flow_output_sb_multicast_group_handler);
> > -    engine_add_input(&en_flow_output, &en_sb_port_binding,
> > -                     flow_output_sb_port_binding_handler);
> > -    engine_add_input(&en_flow_output, &en_sb_mac_binding,
> > -                     flow_output_sb_mac_binding_handler);
> > -    engine_add_input(&en_flow_output, &en_sb_logical_flow,
> > -                     flow_output_sb_logical_flow_handler);
> > +    engine_add_input(&en_lflow_output, &en_ct_zones,
> > +                     engine_noop_handler);

It seems ct_zones is for physical flow computing, and we handles it by
recomputing physical flows. I didn't see it used for logical flow
computing. So can we remove it from the dependency?

> > +    engine_add_input(&en_lflow_output, &en_ovs_interface,
> > +                     engine_noop_handler);

Same for ovs_interface, I didn't see it used for lflow_output. Shall we
remove it?

> > +    engine_add_input(&en_lflow_output, &en_sb_chassis,
> > +                     engine_noop_handler);

This one doesn't seem to justify a noop handler. It is used for lflow
computing and we cannot handle the changes incrementally (for now), so we
should use NULL just to trigger recompute.

> > +    engine_add_input(&en_lflow_output, &en_sb_multicast_group,
> > +                     engine_noop_handler);
> >
> > +    /* Any changes to the port binding, need not be handled
> > +     * for lflow_outout engine.  We still need sb_port_binding
> > +     * as input to access the port binding data in lflow.c and
> > +     * hence the noop handler. */
> > +    engine_add_input(&en_lflow_output, &en_sb_port_binding,
> > +                     engine_noop_handler);

For multicast_group and port_binding, we didn't handle it previously, but
it is not correct. I submitted a patch series last week. Please help review
it and probably it can be incorporated to this patch later if it looks ok.
https://patchwork.ozlabs.org/project/ovn/list/?series=246286

Thanks,
Han
Numan Siddique June 2, 2021, 4:39 p.m. UTC | #3
On Wed, Jun 2, 2021 at 2:22 AM Han Zhou <hzhou@ovn.org> wrote:
>
> On Tue, Jun 1, 2021 at 5:10 PM Numan Siddique <numans@ovn.org> wrote:
> >
> > On Sun, May 30, 2021 at 11:45 PM <numans@ovn.org> wrote:
> > >
> > > From: Numan Siddique <numans@ovn.org>
> > >
> > > Presently, the 'flow_output' engine node recomputes physical
> > > flows by calling physical_run() in the 'physical_flow_changes'
> > > handler in some scenarios.  Because of this, an engine run can
> > > do a full recompute of physical flows but not full recompute
> > > of logical flows.  Although this works now, it is problematic
> > > as the same desired flow table is used for both physical and
> > > logical flows.
> > >
> > > This patch now separates the handling of logical flows and
> > > physical flows and removes the 'physical_flow_changes' engine
> > > node.  Two separate engine nodes are added - lflow_output and
> > > pflow_output with their own flow tables and these two nodes are
> > > now inputs to the main engine node - flow_output.  This separation
> > > reflects the data dependency more clearly.
> > >
> > > CC: Han Zhou <hzhou@ovn.org>
> > > Signed-off-by: Numan Siddique <numans@ovn.org>
> >
> > Hi Han,
> >
> > Gentle ping.  Wondering if you got the chance to take a look at the
> > first patch of the series.
> >
>
> Hi Numan,
>
> Thanks for the revision and sorry for the slow response. I see that you
> updated some change handlers from noop to NULL for pflow_output, which
> makes more sense now. However, I see there are still several noop handlers
> for lflow_output node.
> When we use a noop handler, it means that the input is used for compute the
> output (otherwise it wouldn't be a dependency), but we know that the
> related change is handled *indirectly* when handling another input. I think
> this should be the only situation we use noop handler, and when using it we
> need to document where is the change handled. Please see my comments
> inlined for each of the input below.
>
> > It would be great if the first patch can be considered before we
> > branch (or before the 21.06 release).
> >
> I understand this patch has been hanging around for quite long, but since
> it is more of a refactoring than feature or bug-fix, I'd in fact prefer
> merging it *after* the release, together with the related patches in the
> series, because of the experiences we had before for big changes in I-P
> (including many of my patches) that caused regressions. When we are
> confident enough we could still backport it to 21.06 when necessary. What
> do you think?

Hi Han,

Thanks for the review comments.  I was hoping to get this patch in.
But I agree.  We can consider these patches after the branch and
if they are stable may be backport later.  I am more interesting in
other patches in this series which are required for scale setups.


>
>
> > > @@ -2731,58 +2691,71 @@ main(int argc, char *argv[])
> > >      engine_add_input(&en_port_groups, &en_runtime_data,
> > >                       port_groups_runtime_data_handler);
> > >
> > > -    /* Engine node physical_flow_changes indicates whether
> > > -     * we can recompute only physical flows or we can
> > > -     * incrementally process the physical flows.
> > > -     *
> > > -     * Note: The order of inputs is important, all OVS interface
> changes must
> > > +    /* Note: The order of inputs is important, all OVS interface
> changes must
> > >       * be handled before any ct_zone changes.
> > >       */
> > > -    engine_add_input(&en_physical_flow_changes, &en_ovs_interface,
> > > -                     physical_flow_changes_ovs_iface_handler);
> > > -    engine_add_input(&en_physical_flow_changes, &en_ct_zones,
> > > -                     physical_flow_changes_ct_zones_handler);
> > > -
> > > -    engine_add_input(&en_flow_output, &en_addr_sets,
> > > -                     flow_output_addr_sets_handler);
> > > -    engine_add_input(&en_flow_output, &en_port_groups,
> > > -                     flow_output_port_groups_handler);
> > > -    engine_add_input(&en_flow_output, &en_runtime_data,
> > > -                     flow_output_runtime_data_handler);
> > > -    engine_add_input(&en_flow_output, &en_mff_ovn_geneve, NULL);
> > > -    engine_add_input(&en_flow_output, &en_physical_flow_changes,
> > > -                     flow_output_physical_flow_changes_handler);
> > > +    engine_add_input(&en_pflow_output, &en_ovs_interface,
> > > +                     pflow_output_ovs_iface_handler);
> > > +    engine_add_input(&en_pflow_output, &en_ct_zones,
> > > +                     NULL);
> > > +    engine_add_input(&en_pflow_output, &en_sb_chassis,
> > > +                     pflow_output_sb_chassis_handler);
> > > +    engine_add_input(&en_pflow_output, &en_sb_port_binding,
> > > +                     pflow_output_sb_port_binding_handler);
> > > +    engine_add_input(&en_pflow_output, &en_sb_multicast_group,
> > > +                     pflow_output_sb_multicast_group_handler);
> > > +
> > > +    engine_add_input(&en_pflow_output, &en_runtime_data,
> > > +                     NULL);
> > > +    engine_add_input(&en_pflow_output, &en_sb_encap, NULL);
> > > +    engine_add_input(&en_pflow_output, &en_mff_ovn_geneve, NULL);
> > > +    engine_add_input(&en_pflow_output, &en_ovs_open_vswitch, NULL);
> > > +    engine_add_input(&en_pflow_output, &en_ovs_bridge, NULL);
> > > +
> > > +    engine_add_input(&en_lflow_output, &en_addr_sets,
> > > +                     lflow_output_addr_sets_handler);
> > > +    engine_add_input(&en_lflow_output, &en_port_groups,
> > > +                     lflow_output_port_groups_handler);
> > > +    engine_add_input(&en_lflow_output, &en_runtime_data,
> > > +                     lflow_output_runtime_data_handler);
> > >
> > >      /* We need this input nodes for only data. Hence the noop handler.
> */
> > > -    engine_add_input(&en_flow_output, &en_ct_zones,
> engine_noop_handler);
> > > -    engine_add_input(&en_flow_output, &en_ovs_interface,
> engine_noop_handler);
> > > -
> > > -    engine_add_input(&en_flow_output, &en_ovs_open_vswitch, NULL);
> > > -    engine_add_input(&en_flow_output, &en_ovs_bridge, NULL);
> > > -
> > > -    engine_add_input(&en_flow_output, &en_sb_chassis, NULL);
> > > -    engine_add_input(&en_flow_output, &en_sb_encap, NULL);
> > > -    engine_add_input(&en_flow_output, &en_sb_multicast_group,
> > > -                     flow_output_sb_multicast_group_handler);
> > > -    engine_add_input(&en_flow_output, &en_sb_port_binding,
> > > -                     flow_output_sb_port_binding_handler);
> > > -    engine_add_input(&en_flow_output, &en_sb_mac_binding,
> > > -                     flow_output_sb_mac_binding_handler);
> > > -    engine_add_input(&en_flow_output, &en_sb_logical_flow,
> > > -                     flow_output_sb_logical_flow_handler);
> > > +    engine_add_input(&en_lflow_output, &en_ct_zones,
> > > +                     engine_noop_handler);
>
> It seems ct_zones is for physical flow computing, and we handles it by
> recomputing physical flows. I didn't see it used for logical flow
> computing. So can we remove it from the dependency?

I agree.  I'll remove in v9.

>
> > > +    engine_add_input(&en_lflow_output, &en_ovs_interface,
> > > +                     engine_noop_handler);
>
> Same for ovs_interface, I didn't see it used for lflow_output. Shall we
> remove it?

Ack.

>
> > > +    engine_add_input(&en_lflow_output, &en_sb_chassis,
> > > +                     engine_noop_handler);
>
> This one doesn't seem to justify a noop handler. It is used for lflow
> computing and we cannot handle the changes incrementally (for now), so we
> should use NULL just to trigger recompute.

Ok.  I thought that handling the chassis changs in pflow_output is enough.
In v9 I will make sb_chassis input handler as NULL for both pflow_output
and lflow_output and in the next patch add a handler for both pflow and lflow.

>
> > > +    engine_add_input(&en_lflow_output, &en_sb_multicast_group,
> > > +                     engine_noop_handler);
> > >
> > > +    /* Any changes to the port binding, need not be handled
> > > +     * for lflow_outout engine.  We still need sb_port_binding
> > > +     * as input to access the port binding data in lflow.c and
> > > +     * hence the noop handler. */
> > > +    engine_add_input(&en_lflow_output, &en_sb_port_binding,
> > > +                     engine_noop_handler);
>
> For multicast_group and port_binding, we didn't handle it previously, but
> it is not correct. I submitted a patch series last week. Please help review
> it and probably it can be incorporated to this patch later if it looks ok.
> https://patchwork.ozlabs.org/project/ovn/list/?series=246286

Ok.  For v9 I'll leave AS IS.  I think either you can update your patch series
or I can update mine depending on whose patches get in first.

Thanks
Numan

>
> Thanks,
> Han
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
diff mbox series

Patch

diff --git a/TODO.rst b/TODO.rst
index c89fe203e1..618ea4844a 100644
--- a/TODO.rst
+++ b/TODO.rst
@@ -164,3 +164,9 @@  OVN To-do List
     to find a way of determining if routing has already been executed (on a
     different hypervisor) for the IP multicast packet being processed locally
     in the router pipeline.
+
+* ovn-controller Incremental processing
+
+  * physical.c has a global simap -localvif_to_ofport which stores the
+    local OVS interfaces and the ofport numbers. Move this to the engine data
+    of the engine data node - ed_type_pflow_output.
diff --git a/controller/ofctrl.c b/controller/ofctrl.c
index c29c3d1805..053631590b 100644
--- a/controller/ofctrl.c
+++ b/controller/ofctrl.c
@@ -173,7 +173,7 @@  struct sb_flow_ref {
     struct uuid sb_uuid;
 };
 
-/* A installed flow, in static variable installed_flows.
+/* An installed flow, in static variable installed_lflows/installed_pflows.
  *
  * Installed flows are updated in ofctrl_put for maintaining the flow
  * installation to OVS. They are updated according to desired flows: either by
@@ -234,7 +234,7 @@  static struct desired_flow *desired_flow_lookup_conjunctive(
 static void desired_flow_destroy(struct desired_flow *);
 
 static struct installed_flow *installed_flow_lookup(
-    const struct ovn_flow *target);
+    const struct ovn_flow *target, struct hmap *installed_flows);
 static void installed_flow_destroy(struct installed_flow *);
 static struct installed_flow *installed_flow_dup(struct desired_flow *);
 static struct desired_flow *installed_flow_get_active(struct installed_flow *);
@@ -302,9 +302,12 @@  static ovs_be32 xid, xid2;
  * zero, to avoid unbounded buffering. */
 static struct rconn_packet_counter *tx_counter;
 
-/* Flow table of "struct ovn_flow"s, that holds the flow table currently
- * installed in the switch. */
-static struct hmap installed_flows;
+/* Flow table of "struct ovn_flow"s, that holds the logical flow table
+ * currently installed in the switch. */
+static struct hmap installed_lflows;
+/* Flow table of "struct ovn_flow"s, that holds the physical flow table
+ * currently installed in the switch. */
+static struct hmap installed_pflows;
 
 /* A reference to the group_table. */
 static struct ovn_extend_table *groups;
@@ -343,7 +346,8 @@  ofctrl_init(struct ovn_extend_table *group_table,
     swconn = rconn_create(inactivity_probe_interval, 0,
                           DSCP_DEFAULT, 1 << OFP15_VERSION);
     tx_counter = rconn_packet_counter_create();
-    hmap_init(&installed_flows);
+    hmap_init(&installed_lflows);
+    hmap_init(&installed_pflows);
     ovs_list_init(&flow_updates);
     ovn_init_symtab(&symtab);
     groups = group_table;
@@ -1426,11 +1430,12 @@  desired_flow_lookup_conjunctive(struct ovn_desired_flow_table *flow_table,
 /* Finds and returns an installed_flow in installed_flows whose key is
  * identical to 'target''s key, or NULL if there is none. */
 static struct installed_flow *
-installed_flow_lookup(const struct ovn_flow *target)
+installed_flow_lookup(const struct ovn_flow *target,
+                      struct hmap *installed_flows)
 {
     struct installed_flow *i;
     HMAP_FOR_EACH_WITH_HASH (i, match_hmap_node, target->hash,
-                             &installed_flows) {
+                             installed_flows) {
         struct ovn_flow *f = &i->flow;
         if (f->table_id == target->table_id
             && f->priority == target->priority
@@ -1542,8 +1547,14 @@  static void
 ovn_installed_flow_table_clear(void)
 {
     struct installed_flow *f, *next;
-    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_flows) {
-        hmap_remove(&installed_flows, &f->match_hmap_node);
+    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_lflows) {
+        hmap_remove(&installed_lflows, &f->match_hmap_node);
+        unlink_all_refs_for_installed_flow(f);
+        installed_flow_destroy(f);
+    }
+
+    HMAP_FOR_EACH_SAFE (f, next, match_hmap_node, &installed_pflows) {
+        hmap_remove(&installed_pflows, &f->match_hmap_node);
         unlink_all_refs_for_installed_flow(f);
         installed_flow_destroy(f);
     }
@@ -1553,7 +1564,8 @@  static void
 ovn_installed_flow_table_destroy(void)
 {
     ovn_installed_flow_table_clear();
-    hmap_destroy(&installed_flows);
+    hmap_destroy(&installed_lflows);
+    hmap_destroy(&installed_pflows);
 }
 
 /* Flow table update. */
@@ -1829,6 +1841,7 @@  installed_flow_del(struct ovn_flow *i,
 static void
 update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
                                   struct ofputil_bundle_ctrl_msg *bc,
+                                  struct hmap *installed_flows,
                                   struct ovs_list *msgs)
 {
     ovs_assert(ovs_list_is_empty(&flow_table->tracked_flows));
@@ -1836,7 +1849,7 @@  update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
      * longer desired, delete them; if any of them should have different
      * actions, update them. */
     struct installed_flow *i, *next;
-    HMAP_FOR_EACH_SAFE (i, next, match_hmap_node, &installed_flows) {
+    HMAP_FOR_EACH_SAFE (i, next, match_hmap_node, installed_flows) {
         unlink_all_refs_for_installed_flow(i);
         struct desired_flow *d = desired_flow_lookup(flow_table, &i->flow);
         if (!d) {
@@ -1845,7 +1858,7 @@  update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
             installed_flow_del(&i->flow, bc, msgs);
             ovn_flow_log(&i->flow, "removing installed");
 
-            hmap_remove(&installed_flows, &i->match_hmap_node);
+            hmap_remove(installed_flows, &i->match_hmap_node);
             installed_flow_destroy(i);
         } else {
             if (!ofpacts_equal(i->flow.ofpacts, i->flow.ofpacts_len,
@@ -1863,14 +1876,14 @@  update_installed_flows_by_compare(struct ovn_desired_flow_table *flow_table,
      * in the installed flow table. */
     struct desired_flow *d;
     HMAP_FOR_EACH (d, match_hmap_node, &flow_table->match_flow_table) {
-        i = installed_flow_lookup(&d->flow);
+        i = installed_flow_lookup(&d->flow, installed_flows);
         if (!i) {
             ovn_flow_log(&d->flow, "adding installed");
             installed_flow_add(&d->flow, bc, msgs);
 
             /* Copy 'd' from 'flow_table' to installed_flows. */
             i = installed_flow_dup(d);
-            hmap_insert(&installed_flows, &i->match_hmap_node, i->flow.hash);
+            hmap_insert(installed_flows, &i->match_hmap_node, i->flow.hash);
             link_installed_to_desired(i, d);
         } else if (!d->installed_flow) {
             /* This is a desired_flow that conflicts with one installed
@@ -1961,6 +1974,7 @@  merge_tracked_flows(struct ovn_desired_flow_table *flow_table)
 static void
 update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
                                 struct ofputil_bundle_ctrl_msg *bc,
+                                struct hmap *installed_flows,
                                 struct ovs_list *msgs)
 {
     merge_tracked_flows(flow_table);
@@ -1979,7 +1993,7 @@  update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
                     installed_flow_del(&i->flow, bc, msgs);
                     ovn_flow_log(&i->flow, "removing installed (tracked)");
 
-                    hmap_remove(&installed_flows, &i->match_hmap_node);
+                    hmap_remove(installed_flows, &i->match_hmap_node);
                     installed_flow_destroy(i);
                 } else if (was_active) {
                     /* There are other desired flow(s) referencing this
@@ -1993,7 +2007,8 @@  update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
             desired_flow_destroy(f);
         } else {
             /* The desired flow was added or modified. */
-            struct installed_flow *i = installed_flow_lookup(&f->flow);
+            struct installed_flow *i = installed_flow_lookup(&f->flow,
+                                                             installed_flows);
             if (!i) {
                 /* Adding a new flow. */
                 installed_flow_add(&f->flow, bc, msgs);
@@ -2001,7 +2016,7 @@  update_installed_flows_by_track(struct ovn_desired_flow_table *flow_table,
 
                 /* Copy 'f' from 'flow_table' to installed_flows. */
                 struct installed_flow *new_node = installed_flow_dup(f);
-                hmap_insert(&installed_flows, &new_node->match_hmap_node,
+                hmap_insert(installed_flows, &new_node->match_hmap_node,
                             new_node->flow.hash);
                 link_installed_to_desired(new_node, f);
             } else if (installed_flow_get_active(i) == f) {
@@ -2055,16 +2070,19 @@  ofctrl_can_put(void)
  *
  * This should be called after ofctrl_run() within the main loop. */
 void
-ofctrl_put(struct ovn_desired_flow_table *flow_table,
+ofctrl_put(struct ovn_desired_flow_table *lflow_table,
+           struct ovn_desired_flow_table *pflow_table,
            struct shash *pending_ct_zones,
            const struct sbrec_meter_table *meter_table,
            uint64_t req_cfg,
-           bool flow_changed)
+           bool lflows_changed,
+           bool pflows_changed)
 {
     static bool skipped_last_time = false;
     static uint64_t old_req_cfg = 0;
     bool need_put = false;
-    if (flow_changed || skipped_last_time || need_reinstall_flows) {
+    if (lflows_changed || pflows_changed || skipped_last_time ||
+        need_reinstall_flows) {
         need_put = true;
         old_req_cfg = req_cfg;
     } else if (req_cfg != old_req_cfg) {
@@ -2093,7 +2111,6 @@  ofctrl_put(struct ovn_desired_flow_table *flow_table,
         return;
     }
 
-    skipped_last_time = false;
     need_reinstall_flows = false;
 
     /* OpenFlow messages to send to the switch to bring it up-to-date. */
@@ -2159,12 +2176,35 @@  ofctrl_put(struct ovn_desired_flow_table *flow_table,
     bundle_open = ofputil_encode_bundle_ctrl_request(OFP15_VERSION, &bc);
     ovs_list_push_back(&msgs, &bundle_open->list_node);
 
-    if (flow_table->change_tracked) {
-        update_installed_flows_by_track(flow_table, &bc, &msgs);
-    } else {
-        update_installed_flows_by_compare(flow_table, &bc, &msgs);
+    /* If skipped last time, then process the flow table
+     * (tracked) flows even if lflows_changed is not set.
+     * Same for pflows_changed. */
+    if (lflows_changed || skipped_last_time) {
+        if (lflow_table->change_tracked) {
+            update_installed_flows_by_track(lflow_table, &bc,
+                                            &installed_lflows,
+                                            &msgs);
+        } else {
+            update_installed_flows_by_compare(lflow_table, &bc,
+                                              &installed_lflows,
+                                              &msgs);
+        }
+    }
+
+    if (pflows_changed || skipped_last_time) {
+        if (pflow_table->change_tracked) {
+            update_installed_flows_by_track(pflow_table, &bc,
+                                            &installed_pflows,
+                                            &msgs);
+        } else {
+            update_installed_flows_by_compare(pflow_table, &bc,
+                                              &installed_pflows,
+                                              &msgs);
+        }
     }
 
+    skipped_last_time = false;
+
     if (ovs_list_back(&msgs) == &bundle_open->list_node) {
         /* No flow updates.  Removing the bundle open request. */
         ovs_list_pop_back(&msgs);
@@ -2287,8 +2327,11 @@  ofctrl_put(struct ovn_desired_flow_table *flow_table,
         cur_cfg = req_cfg;
     }
 
-    flow_table->change_tracked = true;
-    ovs_assert(ovs_list_is_empty(&flow_table->tracked_flows));
+    lflow_table->change_tracked = true;
+    ovs_assert(ovs_list_is_empty(&lflow_table->tracked_flows));
+
+    pflow_table->change_tracked = true;
+    ovs_assert(ovs_list_is_empty(&pflow_table->tracked_flows));
 }
 
 /* Looks up the logical port with the name 'port_name' in 'br_int_'.  If
diff --git a/controller/ofctrl.h b/controller/ofctrl.h
index 88769566ac..ead8088c5b 100644
--- a/controller/ofctrl.h
+++ b/controller/ofctrl.h
@@ -52,11 +52,13 @@  void ofctrl_init(struct ovn_extend_table *group_table,
 void ofctrl_run(const struct ovsrec_bridge *br_int,
                 struct shash *pending_ct_zones);
 enum mf_field_id ofctrl_get_mf_field_id(void);
-void ofctrl_put(struct ovn_desired_flow_table *,
+void ofctrl_put(struct ovn_desired_flow_table *lflow_table,
+                struct ovn_desired_flow_table *pflow_table,
                 struct shash *pending_ct_zones,
                 const struct sbrec_meter_table *,
                 uint64_t nb_cfg,
-                bool flow_changed);
+                bool lflow_changed,
+                bool pflow_changed);
 bool ofctrl_can_put(void);
 void ofctrl_wait(void);
 void ofctrl_destroy(void);
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index d48ddc7a27..e3051189b1 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -563,7 +563,7 @@  add_pending_ct_zone_entry(struct shash *pending_ct_zones,
 static void
 update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
                 struct simap *ct_zones, unsigned long *ct_zone_bitmap,
-                struct shash *pending_ct_zones, struct hmapx *updated_dps)
+                struct shash *pending_ct_zones)
 {
     struct simap_node *ct_zone, *ct_zone_next;
     int scan_start = 1;
@@ -653,11 +653,6 @@  update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
 
         bitmap_set1(ct_zone_bitmap, snat_req_node->data);
         simap_put(ct_zones, snat_req_node->name, snat_req_node->data);
-        struct shash_node *ld_node = shash_find(&all_lds, snat_req_node->name);
-        if (ld_node) {
-            struct local_datapath *dp = ld_node->data;
-            hmapx_add(updated_dps, (void *) dp->datapath);
-        }
     }
 
     /* xxx This is wasteful to assign a zone to each port--even if no
@@ -686,12 +681,6 @@  update_ct_zones(const struct sset *lports, const struct hmap *local_datapaths,
 
         bitmap_set1(ct_zone_bitmap, zone);
         simap_put(ct_zones, user, zone);
-
-        struct shash_node *ld_node = shash_find(&all_lds, user);
-        if (ld_node) {
-            struct local_datapath *dp = ld_node->data;
-            hmapx_add(updated_dps, (void *) dp->datapath);
-        }
     }
 
     simap_destroy(&req_snat_zones);
@@ -983,9 +972,6 @@  struct ed_type_runtime_data {
     bool tracked;
     bool local_lports_changed;
     struct hmap tracked_dp_bindings;
-
-    /* CT zone data. Contains datapaths that had updated CT zones */
-    struct hmapx ct_updated_datapaths;
 };
 
 /* struct ed_type_runtime_data has the below members for tracking the
@@ -1077,8 +1063,6 @@  en_runtime_data_init(struct engine_node *node OVS_UNUSED,
     /* Init the tracked data. */
     hmap_init(&data->tracked_dp_bindings);
 
-    hmapx_init(&data->ct_updated_datapaths);
-
     return data;
 }
 
@@ -1101,7 +1085,6 @@  en_runtime_data_cleanup(void *data)
     }
     hmap_destroy(&rt_data->local_datapaths);
     local_binding_data_destroy(&rt_data->lbinding_data);
-    hmapx_destroy(&rt_data->ct_updated_datapaths);
 }
 
 static void
@@ -1224,7 +1207,6 @@  en_runtime_data_run(struct engine_node *node, void *data)
         sset_init(&rt_data->egress_ifaces);
         smap_init(&rt_data->local_iface_ids);
         local_binding_data_init(&rt_data->lbinding_data);
-        hmapx_clear(&rt_data->ct_updated_datapaths);
     }
 
     struct binding_ctx_in b_ctx_in;
@@ -1744,10 +1726,9 @@  en_ct_zones_run(struct engine_node *node, void *data)
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
 
-    hmapx_clear(&rt_data->ct_updated_datapaths);
     update_ct_zones(&rt_data->local_lports, &rt_data->local_datapaths,
                     &ct_zones_data->current, ct_zones_data->bitmap,
-                    &ct_zones_data->pending, &rt_data->ct_updated_datapaths);
+                    &ct_zones_data->pending);
 
 
     engine_set_node_state(node, EN_UPDATED);
@@ -1790,107 +1771,13 @@  en_mff_ovn_geneve_run(struct engine_node *node, void *data)
     engine_set_node_state(node, EN_UNCHANGED);
 }
 
-/* Engine node en_physical_flow_changes indicates whether
- * there is a need to
- *   - recompute only physical flows or
- *   - we can incrementally process the physical flows.
- *
- * en_physical_flow_changes is an input to flow_output engine node.
- * If the engine node 'en_physical_flow_changes' gets updated during
- * engine run, it means the handler for this -
- * flow_output_physical_flow_changes_handler() will either
- *    - recompute the physical flows by calling 'physical_run() or
- *    - incrementlly process some of the changes for physical flow
- *      calculation. Right now we handle OVS interfaces changes
- *      for physical flow computation.
- *
- * When ever a port binding happens, the follow up
- * activity is the zone id allocation for that port binding.
- * With this intermediate engine node, we avoid full recomputation.
- * Instead we do physical flow computation (either full recomputation
- * by calling physical_run() or handling the changes incrementally.
- *
- * Hence this is an intermediate engine node to indicate the
- * flow_output engine to recomputes/compute the physical flows.
- *
- * TODO 1. Ideally this engine node should recompute/compute the physical
- *         flows instead of relegating it to the flow_output node.
- *         But this requires splitting the flow_output node to
- *         logical_flow_output and physical_flow_output.
- *
- * TODO 2. We can further optimise the en_ct_zone changes to
- *         compute the phsyical flows for changed zone ids.
- *
- * TODO 3: physical.c has a global simap -localvif_to_ofport which stores the
- *         local OVS interfaces and the ofport numbers. Ideally this should be
- *         part of the engine data.
- */
-struct ed_type_pfc_data {
-    /* Both these variables are tracked and set in each engine run. */
-    bool recompute_physical_flows;
-    bool ovs_ifaces_changed;
-};
-
-static void
-en_physical_flow_changes_clear_tracked_data(void *data_)
-{
-    struct ed_type_pfc_data *data = data_;
-    data->recompute_physical_flows = false;
-    data->ovs_ifaces_changed = false;
-}
-
-static void *
-en_physical_flow_changes_init(struct engine_node *node OVS_UNUSED,
-                              struct engine_arg *arg OVS_UNUSED)
-{
-    struct ed_type_pfc_data *data = xzalloc(sizeof *data);
-    return data;
-}
-
-static void
-en_physical_flow_changes_cleanup(void *data OVS_UNUSED)
-{
-}
-
-/* Indicate to the flow_output engine that we need to recompute physical
- * flows. */
-static void
-en_physical_flow_changes_run(struct engine_node *node, void *data)
-{
-    struct ed_type_pfc_data *pfc_tdata = data;
-    pfc_tdata->recompute_physical_flows = true;
-    pfc_tdata->ovs_ifaces_changed = true;
-    engine_set_node_state(node, EN_UPDATED);
-}
-
-/* ct_zone changes are not handled incrementally but a handler is required
- * to avoid skipping the ovs_iface incremental change handler.
- */
-static bool
-physical_flow_changes_ct_zones_handler(struct engine_node *node OVS_UNUSED,
-                                       void *data OVS_UNUSED)
-{
-    return false;
-}
-
-/* There are OVS interface changes. Indicate to the flow_output engine
- * to handle these OVS interface changes for physical flow computations. */
-static bool
-physical_flow_changes_ovs_iface_handler(struct engine_node *node, void *data)
-{
-    struct ed_type_pfc_data *pfc_tdata = data;
-    pfc_tdata->ovs_ifaces_changed = true;
-    engine_set_node_state(node, EN_UPDATED);
-    return true;
-}
-
-struct flow_output_persistent_data {
+struct lflow_output_persistent_data {
     uint32_t conj_id_ofs;
     struct lflow_cache *lflow_cache;
 };
 
-struct ed_type_flow_output {
-    /* desired flows */
+struct ed_type_lflow_output {
+    /* Logical flow table */
     struct ovn_desired_flow_table flow_table;
     /* group ids for load balancing */
     struct ovn_extend_table group_table;
@@ -1901,81 +1788,15 @@  struct ed_type_flow_output {
 
     /* Data which is persistent and not cleared during
      * full recompute. */
-    struct flow_output_persistent_data pd;
+    struct lflow_output_persistent_data pd;
 };
 
-static void init_physical_ctx(struct engine_node *node,
-                              struct ed_type_runtime_data *rt_data,
-                              struct physical_ctx *p_ctx)
-{
-    struct ovsdb_idl_index *sbrec_port_binding_by_name =
-        engine_ovsdb_node_get_index(
-                engine_get_input("SB_port_binding", node),
-                "name");
-
-    struct sbrec_multicast_group_table *multicast_group_table =
-        (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
-            engine_get_input("SB_multicast_group", node));
-
-    struct sbrec_port_binding_table *port_binding_table =
-        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
-            engine_get_input("SB_port_binding", node));
-
-    struct sbrec_chassis_table *chassis_table =
-        (struct sbrec_chassis_table *)EN_OVSDB_GET(
-            engine_get_input("SB_chassis", node));
-
-    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
-        engine_get_input_data("mff_ovn_geneve", node);
-
-    struct ovsrec_open_vswitch_table *ovs_table =
-        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
-            engine_get_input("OVS_open_vswitch", node));
-    struct ovsrec_bridge_table *bridge_table =
-        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
-            engine_get_input("OVS_bridge", node));
-    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
-    const char *chassis_id = get_ovs_chassis_id(ovs_table);
-    const struct sbrec_chassis *chassis = NULL;
-    struct ovsdb_idl_index *sbrec_chassis_by_name =
-        engine_ovsdb_node_get_index(
-                engine_get_input("SB_chassis", node),
-                "name");
-    if (chassis_id) {
-        chassis = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
-    }
-
-    ovs_assert(br_int && chassis);
-
-    struct ovsrec_interface_table *iface_table =
-        (struct ovsrec_interface_table *)EN_OVSDB_GET(
-            engine_get_input("OVS_interface", node));
-
-    struct ed_type_ct_zones *ct_zones_data =
-        engine_get_input_data("ct_zones", node);
-    struct simap *ct_zones = &ct_zones_data->current;
-
-    p_ctx->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
-    p_ctx->port_binding_table = port_binding_table;
-    p_ctx->mc_group_table = multicast_group_table;
-    p_ctx->br_int = br_int;
-    p_ctx->chassis_table = chassis_table;
-    p_ctx->iface_table = iface_table;
-    p_ctx->chassis = chassis;
-    p_ctx->active_tunnels = &rt_data->active_tunnels;
-    p_ctx->local_datapaths = &rt_data->local_datapaths;
-    p_ctx->local_lports = &rt_data->local_lports;
-    p_ctx->ct_zones = ct_zones;
-    p_ctx->mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
-    p_ctx->local_bindings = &rt_data->lbinding_data.bindings;
-    p_ctx->ct_updated_datapaths = &rt_data->ct_updated_datapaths;
-}
-
-static void init_lflow_ctx(struct engine_node *node,
-                           struct ed_type_runtime_data *rt_data,
-                           struct ed_type_flow_output *fo,
-                           struct lflow_ctx_in *l_ctx_in,
-                           struct lflow_ctx_out *l_ctx_out)
+static void
+init_lflow_ctx(struct engine_node *node,
+               struct ed_type_runtime_data *rt_data,
+               struct ed_type_lflow_output *fo,
+               struct lflow_ctx_in *l_ctx_in,
+               struct lflow_ctx_out *l_ctx_out)
 {
     struct ovsdb_idl_index *sbrec_port_binding_by_name =
         engine_ovsdb_node_get_index(
@@ -2085,11 +1906,10 @@  static void init_lflow_ctx(struct engine_node *node,
 }
 
 static void *
-en_flow_output_init(struct engine_node *node OVS_UNUSED,
-                    struct engine_arg *arg OVS_UNUSED)
+en_lflow_output_init(struct engine_node *node OVS_UNUSED,
+                     struct engine_arg *arg OVS_UNUSED)
 {
-    struct ed_type_flow_output *data = xzalloc(sizeof *data);
-
+    struct ed_type_lflow_output *data = xzalloc(sizeof *data);
     ovn_desired_flow_table_init(&data->flow_table);
     ovn_extend_table_init(&data->group_table);
     ovn_extend_table_init(&data->meter_table);
@@ -2099,9 +1919,9 @@  en_flow_output_init(struct engine_node *node OVS_UNUSED,
 }
 
 static void
-en_flow_output_cleanup(void *data)
+en_lflow_output_cleanup(void *data)
 {
-    struct ed_type_flow_output *flow_output_data = data;
+    struct ed_type_lflow_output *flow_output_data = data;
     ovn_desired_flow_table_destroy(&flow_output_data->flow_table);
     ovn_extend_table_destroy(&flow_output_data->group_table);
     ovn_extend_table_destroy(&flow_output_data->meter_table);
@@ -2110,7 +1930,7 @@  en_flow_output_cleanup(void *data)
 }
 
 static void
-en_flow_output_run(struct engine_node *node, void *data)
+en_lflow_output_run(struct engine_node *node, void *data)
 {
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
@@ -2136,8 +1956,8 @@  en_flow_output_run(struct engine_node *node, void *data)
 
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo = data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ed_type_lflow_output *fo = data;
+    struct ovn_desired_flow_table *lflow_table = &fo->flow_table;
     struct ovn_extend_table *group_table = &fo->group_table;
     struct ovn_extend_table *meter_table = &fo->meter_table;
     struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
@@ -2146,7 +1966,7 @@  en_flow_output_run(struct engine_node *node, void *data)
     if (first_run) {
         first_run = false;
     } else {
-        ovn_desired_flow_table_clear(flow_table);
+        ovn_desired_flow_table_clear(lflow_table);
         ovn_extend_table_clear(group_table, false /* desired */);
         ovn_extend_table_clear(meter_table, false /* desired */);
         lflow_resource_clear(lfrr);
@@ -2168,7 +1988,7 @@  en_flow_output_run(struct engine_node *node, void *data)
     if (l_ctx_out.conj_id_overflow) {
         /* Conjunction ids overflow. There can be many holes in between.
          * Destroy lflow cache and call lflow_run() again. */
-        ovn_desired_flow_table_clear(flow_table);
+        ovn_desired_flow_table_clear(lflow_table);
         ovn_extend_table_clear(group_table, false /* desired */);
         ovn_extend_table_clear(meter_table, false /* desired */);
         lflow_resource_clear(lfrr);
@@ -2181,16 +2001,11 @@  en_flow_output_run(struct engine_node *node, void *data)
         }
     }
 
-    struct physical_ctx p_ctx;
-    init_physical_ctx(node, rt_data, &p_ctx);
-
-    physical_run(&p_ctx, &fo->flow_table);
-
     engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
-flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
+lflow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
 {
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
@@ -2203,7 +2018,7 @@  flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
     const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
     ovs_assert(br_int);
 
-    struct ed_type_flow_output *fo = data;
+    struct ed_type_lflow_output *fo = data;
     struct lflow_ctx_in l_ctx_in;
     struct lflow_ctx_out l_ctx_out;
     init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
@@ -2215,7 +2030,7 @@  flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
 }
 
 static bool
-flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
+lflow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
 {
     struct ovsdb_idl_index *sbrec_port_binding_by_name =
         engine_ovsdb_node_get_index(
@@ -2230,60 +2045,17 @@  flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
         engine_get_input_data("runtime_data", node);
     const struct hmap *local_datapaths = &rt_data->local_datapaths;
 
-    struct ed_type_flow_output *fo = data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ed_type_lflow_output *lfo = data;
 
     lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
-            mac_binding_table, local_datapaths, flow_table);
+            mac_binding_table, local_datapaths, &lfo->flow_table);
 
     engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
 static bool
-flow_output_sb_port_binding_handler(struct engine_node *node,
-                                    void *data)
-{
-    struct ed_type_runtime_data *rt_data =
-        engine_get_input_data("runtime_data", node);
-
-    struct ed_type_flow_output *fo = data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-
-    struct physical_ctx p_ctx;
-    init_physical_ctx(node, rt_data, &p_ctx);
-
-    /* We handle port-binding changes for physical flow processing
-     * only. flow_output runtime data handler takes care of processing
-     * logical flows for any port binding changes.
-     */
-    physical_handle_port_binding_changes(&p_ctx, flow_table);
-
-    engine_set_node_state(node, EN_UPDATED);
-    return true;
-}
-
-static bool
-flow_output_sb_multicast_group_handler(struct engine_node *node, void *data)
-{
-    struct ed_type_runtime_data *rt_data =
-        engine_get_input_data("runtime_data", node);
-
-    struct ed_type_flow_output *fo = data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-
-    struct physical_ctx p_ctx;
-    init_physical_ctx(node, rt_data, &p_ctx);
-
-    physical_handle_mc_group_changes(&p_ctx, flow_table);
-
-    engine_set_node_state(node, EN_UPDATED);
-    return true;
-
-}
-
-static bool
-_flow_output_resource_ref_handler(struct engine_node *node, void *data,
+_lflow_output_resource_ref_handler(struct engine_node *node, void *data,
                                   enum ref_type ref_type)
 {
     struct ed_type_runtime_data *rt_data =
@@ -2315,7 +2087,7 @@  _flow_output_resource_ref_handler(struct engine_node *node, void *data,
 
     ovs_assert(br_int && chassis);
 
-    struct ed_type_flow_output *fo = data;
+    struct ed_type_lflow_output *fo = data;
 
     struct lflow_ctx_in l_ctx_in;
     struct lflow_ctx_out l_ctx_out;
@@ -2384,53 +2156,20 @@  _flow_output_resource_ref_handler(struct engine_node *node, void *data,
 }
 
 static bool
-flow_output_addr_sets_handler(struct engine_node *node, void *data)
+lflow_output_addr_sets_handler(struct engine_node *node, void *data)
 {
-    return _flow_output_resource_ref_handler(node, data, REF_TYPE_ADDRSET);
+    return _lflow_output_resource_ref_handler(node, data, REF_TYPE_ADDRSET);
 }
 
 static bool
-flow_output_port_groups_handler(struct engine_node *node, void *data)
+lflow_output_port_groups_handler(struct engine_node *node, void *data)
 {
-    return _flow_output_resource_ref_handler(node, data, REF_TYPE_PORTGROUP);
-}
-
-static bool
-flow_output_physical_flow_changes_handler(struct engine_node *node, void *data)
-{
-    struct ed_type_runtime_data *rt_data =
-        engine_get_input_data("runtime_data", node);
-
-    struct ed_type_flow_output *fo = data;
-    struct physical_ctx p_ctx;
-    init_physical_ctx(node, rt_data, &p_ctx);
-
-    engine_set_node_state(node, EN_UPDATED);
-    struct ed_type_pfc_data *pfc_data =
-        engine_get_input_data("physical_flow_changes", node);
-
-    /* If there are OVS interface changes. Try to handle them incrementally. */
-    if (pfc_data->ovs_ifaces_changed) {
-        if (!physical_handle_ovs_iface_changes(&p_ctx, &fo->flow_table)) {
-            return false;
-        }
-    }
-
-    if (pfc_data->recompute_physical_flows) {
-        /* This indicates that we need to recompute the physical flows. */
-        physical_clear_unassoc_flows_with_db(&fo->flow_table);
-        physical_clear_dp_flows(&p_ctx, &rt_data->ct_updated_datapaths,
-                                &fo->flow_table);
-        physical_run(&p_ctx, &fo->flow_table);
-        return true;
-    }
-
-    return true;
+    return _lflow_output_resource_ref_handler(node, data, REF_TYPE_PORTGROUP);
 }
 
 static bool
-flow_output_runtime_data_handler(struct engine_node *node,
-                                 void *data OVS_UNUSED)
+lflow_output_runtime_data_handler(struct engine_node *node,
+                                  void *data OVS_UNUSED)
 {
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
@@ -2451,12 +2190,9 @@  flow_output_runtime_data_handler(struct engine_node *node,
 
     struct lflow_ctx_in l_ctx_in;
     struct lflow_ctx_out l_ctx_out;
-    struct ed_type_flow_output *fo = data;
+    struct ed_type_lflow_output *fo = data;
     init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
 
-    struct physical_ctx p_ctx;
-    init_physical_ctx(node, rt_data, &p_ctx);
-
     struct tracked_binding_datapath *tdp;
     HMAP_FOR_EACH (tdp, node, tracked_dp_bindings) {
         if (tdp->is_new) {
@@ -2481,12 +2217,12 @@  flow_output_runtime_data_handler(struct engine_node *node,
 }
 
 static bool
-flow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
+lflow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
 {
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
 
-    struct ed_type_flow_output *fo = data;
+    struct ed_type_lflow_output *fo = data;
     struct lflow_ctx_in l_ctx_in;
     struct lflow_ctx_out l_ctx_out;
     init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
@@ -2498,12 +2234,12 @@  flow_output_sb_load_balancer_handler(struct engine_node *node, void *data)
 }
 
 static bool
-flow_output_sb_fdb_handler(struct engine_node *node, void *data)
+lflow_output_sb_fdb_handler(struct engine_node *node, void *data)
 {
     struct ed_type_runtime_data *rt_data =
         engine_get_input_data("runtime_data", node);
 
-    struct ed_type_flow_output *fo = data;
+    struct ed_type_lflow_output *fo = data;
     struct lflow_ctx_in l_ctx_in;
     struct lflow_ctx_out l_ctx_out;
     init_lflow_ctx(node, rt_data, fo, &l_ctx_in, &l_ctx_out);
@@ -2514,6 +2250,230 @@  flow_output_sb_fdb_handler(struct engine_node *node, void *data)
     return handled;
 }
 
+struct ed_type_pflow_output {
+    /* Desired physical flows. */
+    struct ovn_desired_flow_table flow_table;
+};
+
+static void init_physical_ctx(struct engine_node *node,
+                              struct ed_type_runtime_data *rt_data,
+                              struct physical_ctx *p_ctx)
+{
+    struct ovsdb_idl_index *sbrec_port_binding_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_port_binding", node),
+                "name");
+
+    struct sbrec_multicast_group_table *multicast_group_table =
+        (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
+            engine_get_input("SB_multicast_group", node));
+
+    struct sbrec_port_binding_table *port_binding_table =
+        (struct sbrec_port_binding_table *)EN_OVSDB_GET(
+            engine_get_input("SB_port_binding", node));
+
+    struct sbrec_chassis_table *chassis_table =
+        (struct sbrec_chassis_table *)EN_OVSDB_GET(
+            engine_get_input("SB_chassis", node));
+
+    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
+        engine_get_input_data("mff_ovn_geneve", node);
+
+    struct ovsrec_open_vswitch_table *ovs_table =
+        (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_open_vswitch", node));
+    struct ovsrec_bridge_table *bridge_table =
+        (struct ovsrec_bridge_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_bridge", node));
+    const struct ovsrec_bridge *br_int = get_br_int(bridge_table, ovs_table);
+    const char *chassis_id = get_ovs_chassis_id(ovs_table);
+    const struct sbrec_chassis *chassis = NULL;
+    struct ovsdb_idl_index *sbrec_chassis_by_name =
+        engine_ovsdb_node_get_index(
+                engine_get_input("SB_chassis", node),
+                "name");
+    if (chassis_id) {
+        chassis = chassis_lookup_by_name(sbrec_chassis_by_name, chassis_id);
+    }
+
+    ovs_assert(br_int && chassis);
+
+    struct ovsrec_interface_table *iface_table =
+        (struct ovsrec_interface_table *)EN_OVSDB_GET(
+            engine_get_input("OVS_interface", node));
+
+    struct ed_type_ct_zones *ct_zones_data =
+        engine_get_input_data("ct_zones", node);
+    struct simap *ct_zones = &ct_zones_data->current;
+
+    p_ctx->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
+    p_ctx->port_binding_table = port_binding_table;
+    p_ctx->mc_group_table = multicast_group_table;
+    p_ctx->br_int = br_int;
+    p_ctx->chassis_table = chassis_table;
+    p_ctx->iface_table = iface_table;
+    p_ctx->chassis = chassis;
+    p_ctx->active_tunnels = &rt_data->active_tunnels;
+    p_ctx->local_datapaths = &rt_data->local_datapaths;
+    p_ctx->local_lports = &rt_data->local_lports;
+    p_ctx->ct_zones = ct_zones;
+    p_ctx->mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
+    p_ctx->local_bindings = &rt_data->lbinding_data.bindings;
+}
+
+static void *
+en_pflow_output_init(struct engine_node *node OVS_UNUSED,
+                             struct engine_arg *arg OVS_UNUSED)
+{
+    struct ed_type_pflow_output *data = xzalloc(sizeof *data);
+    ovn_desired_flow_table_init(&data->flow_table);
+    return data;
+}
+
+static void
+en_pflow_output_cleanup(void *data OVS_UNUSED)
+{
+    struct ed_type_pflow_output *pfo = data;
+    ovn_desired_flow_table_destroy(&pfo->flow_table);
+}
+
+static void
+en_pflow_output_run(struct engine_node *node, void *data)
+{
+    struct ed_type_pflow_output *pfo = data;
+    struct ovn_desired_flow_table *pflow_table = &pfo->flow_table;
+    static bool first_run = true;
+    if (first_run) {
+        first_run = false;
+    } else {
+        ovn_desired_flow_table_clear(pflow_table);
+    }
+
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);
+
+    struct physical_ctx p_ctx;
+    init_physical_ctx(node, rt_data, &p_ctx);
+    physical_run(&p_ctx, pflow_table);
+
+    engine_set_node_state(node, EN_UPDATED);
+}
+
+static bool
+pflow_output_sb_port_binding_handler(struct engine_node *node,
+                                     void *data)
+{
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);
+
+    struct ed_type_pflow_output *pfo = data;
+
+    struct physical_ctx p_ctx;
+    init_physical_ctx(node, rt_data, &p_ctx);
+
+    /* We handle port-binding changes for physical flow processing
+     * only. flow_output runtime data handler takes care of processing
+     * logical flows for any port binding changes.
+     */
+    physical_handle_port_binding_changes(&p_ctx, &pfo->flow_table);
+
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
+static bool
+pflow_output_sb_multicast_group_handler(struct engine_node *node, void *data)
+{
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);
+
+    struct ed_type_pflow_output *pfo = data;
+
+    struct physical_ctx p_ctx;
+    init_physical_ctx(node, rt_data, &p_ctx);
+
+    physical_handle_mc_group_changes(&p_ctx, &pfo->flow_table);
+
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
+static bool
+pflow_output_ovs_iface_handler(struct engine_node *node OVS_UNUSED,
+                               void *data OVS_UNUSED)
+{
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);
+
+    struct ed_type_pflow_output *pfo = data;
+
+    struct physical_ctx p_ctx;
+    init_physical_ctx(node, rt_data, &p_ctx);
+
+    engine_set_node_state(node, EN_UPDATED);
+    return physical_handle_ovs_iface_changes(&p_ctx, &pfo->flow_table);
+}
+
+/* Handles sbrec_chassis changes.
+ * If a new chassis is added or removed return false, so that
+ * physical flows are programmed.
+ * For any updates, there is no need for any flow computation.
+ * Encap changes will also result in sbrec_chassis changes,
+ * but we handle encap changes separately.
+ */
+static bool
+pflow_output_sb_chassis_handler(struct engine_node *node,
+                                void *data OVS_UNUSED)
+{
+    struct sbrec_chassis_table *chassis_table =
+        (struct sbrec_chassis_table *)EN_OVSDB_GET(
+            engine_get_input("SB_chassis", node));
+
+    const struct sbrec_chassis *ch;
+    SBREC_CHASSIS_TABLE_FOR_EACH_TRACKED (ch, chassis_table) {
+        if (sbrec_chassis_is_deleted(ch) || sbrec_chassis_is_new(ch)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void *
+en_flow_output_init(struct engine_node *node OVS_UNUSED,
+                    struct engine_arg *arg OVS_UNUSED)
+{
+    return NULL;
+}
+
+static void
+en_flow_output_cleanup(void *data OVS_UNUSED)
+{
+
+}
+
+static void
+en_flow_output_run(struct engine_node *node OVS_UNUSED, void *data OVS_UNUSED)
+{
+    engine_set_node_state(node, EN_UPDATED);
+}
+
+static bool
+flow_output_pflow_output_handler(struct engine_node *node,
+                                 void *data OVS_UNUSED)
+{
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
+static bool
+flow_output_lflow_output_handler(struct engine_node *node,
+                                 void *data OVS_UNUSED)
+{
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
 struct ovn_controller_exit_args {
     bool *exiting;
     bool *restart;
@@ -2706,8 +2666,8 @@  main(int argc, char *argv[])
     ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
     ENGINE_NODE(mff_ovn_geneve, "mff_ovn_geneve");
     ENGINE_NODE(ofctrl_is_connected, "ofctrl_is_connected");
-    ENGINE_NODE_WITH_CLEAR_TRACK_DATA(physical_flow_changes,
-                                      "physical_flow_changes");
+    ENGINE_NODE(pflow_output, "physical_flow_output");
+    ENGINE_NODE(lflow_output, "logical_flow_output");
     ENGINE_NODE(flow_output, "flow_output");
     ENGINE_NODE(addr_sets, "addr_sets");
     ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_groups, "port_groups");
@@ -2731,58 +2691,71 @@  main(int argc, char *argv[])
     engine_add_input(&en_port_groups, &en_runtime_data,
                      port_groups_runtime_data_handler);
 
-    /* Engine node physical_flow_changes indicates whether
-     * we can recompute only physical flows or we can
-     * incrementally process the physical flows.
-     *
-     * Note: The order of inputs is important, all OVS interface changes must
+    /* Note: The order of inputs is important, all OVS interface changes must
      * be handled before any ct_zone changes.
      */
-    engine_add_input(&en_physical_flow_changes, &en_ovs_interface,
-                     physical_flow_changes_ovs_iface_handler);
-    engine_add_input(&en_physical_flow_changes, &en_ct_zones,
-                     physical_flow_changes_ct_zones_handler);
-
-    engine_add_input(&en_flow_output, &en_addr_sets,
-                     flow_output_addr_sets_handler);
-    engine_add_input(&en_flow_output, &en_port_groups,
-                     flow_output_port_groups_handler);
-    engine_add_input(&en_flow_output, &en_runtime_data,
-                     flow_output_runtime_data_handler);
-    engine_add_input(&en_flow_output, &en_mff_ovn_geneve, NULL);
-    engine_add_input(&en_flow_output, &en_physical_flow_changes,
-                     flow_output_physical_flow_changes_handler);
+    engine_add_input(&en_pflow_output, &en_ovs_interface,
+                     pflow_output_ovs_iface_handler);
+    engine_add_input(&en_pflow_output, &en_ct_zones,
+                     NULL);
+    engine_add_input(&en_pflow_output, &en_sb_chassis,
+                     pflow_output_sb_chassis_handler);
+    engine_add_input(&en_pflow_output, &en_sb_port_binding,
+                     pflow_output_sb_port_binding_handler);
+    engine_add_input(&en_pflow_output, &en_sb_multicast_group,
+                     pflow_output_sb_multicast_group_handler);
+
+    engine_add_input(&en_pflow_output, &en_runtime_data,
+                     NULL);
+    engine_add_input(&en_pflow_output, &en_sb_encap, NULL);
+    engine_add_input(&en_pflow_output, &en_mff_ovn_geneve, NULL);
+    engine_add_input(&en_pflow_output, &en_ovs_open_vswitch, NULL);
+    engine_add_input(&en_pflow_output, &en_ovs_bridge, NULL);
+
+    engine_add_input(&en_lflow_output, &en_addr_sets,
+                     lflow_output_addr_sets_handler);
+    engine_add_input(&en_lflow_output, &en_port_groups,
+                     lflow_output_port_groups_handler);
+    engine_add_input(&en_lflow_output, &en_runtime_data,
+                     lflow_output_runtime_data_handler);
 
     /* We need this input nodes for only data. Hence the noop handler. */
-    engine_add_input(&en_flow_output, &en_ct_zones, engine_noop_handler);
-    engine_add_input(&en_flow_output, &en_ovs_interface, engine_noop_handler);
-
-    engine_add_input(&en_flow_output, &en_ovs_open_vswitch, NULL);
-    engine_add_input(&en_flow_output, &en_ovs_bridge, NULL);
-
-    engine_add_input(&en_flow_output, &en_sb_chassis, NULL);
-    engine_add_input(&en_flow_output, &en_sb_encap, NULL);
-    engine_add_input(&en_flow_output, &en_sb_multicast_group,
-                     flow_output_sb_multicast_group_handler);
-    engine_add_input(&en_flow_output, &en_sb_port_binding,
-                     flow_output_sb_port_binding_handler);
-    engine_add_input(&en_flow_output, &en_sb_mac_binding,
-                     flow_output_sb_mac_binding_handler);
-    engine_add_input(&en_flow_output, &en_sb_logical_flow,
-                     flow_output_sb_logical_flow_handler);
+    engine_add_input(&en_lflow_output, &en_ct_zones,
+                     engine_noop_handler);
+    engine_add_input(&en_lflow_output, &en_ovs_interface,
+                     engine_noop_handler);
+    engine_add_input(&en_lflow_output, &en_sb_chassis,
+                     engine_noop_handler);
+    engine_add_input(&en_lflow_output, &en_sb_multicast_group,
+                     engine_noop_handler);
+
+    /* Any changes to the port binding, need not be handled
+     * for lflow_outout engine.  We still need sb_port_binding
+     * as input to access the port binding data in lflow.c and
+     * hence the noop handler. */
+    engine_add_input(&en_lflow_output, &en_sb_port_binding,
+                     engine_noop_handler);
+
+    engine_add_input(&en_lflow_output, &en_ovs_open_vswitch, NULL);
+    engine_add_input(&en_lflow_output, &en_ovs_bridge, NULL);
+
+    engine_add_input(&en_lflow_output, &en_sb_mac_binding,
+                     lflow_output_sb_mac_binding_handler);
+    engine_add_input(&en_lflow_output, &en_sb_logical_flow,
+                     lflow_output_sb_logical_flow_handler);
     /* Using a noop handler since we don't really need any data from datapath
      * groups or a full recompute.  Update of a datapath group will put
      * logical flow into the tracked list, so the logical flow handler will
      * process all changes. */
-    engine_add_input(&en_flow_output, &en_sb_logical_dp_group,
+    engine_add_input(&en_lflow_output, &en_sb_logical_dp_group,
                      engine_noop_handler);
-    engine_add_input(&en_flow_output, &en_sb_dhcp_options, NULL);
-    engine_add_input(&en_flow_output, &en_sb_dhcpv6_options, NULL);
-    engine_add_input(&en_flow_output, &en_sb_dns, NULL);
-    engine_add_input(&en_flow_output, &en_sb_load_balancer,
-                     flow_output_sb_load_balancer_handler);
-    engine_add_input(&en_flow_output, &en_sb_fdb,
-                     flow_output_sb_fdb_handler);
+    engine_add_input(&en_lflow_output, &en_sb_dhcp_options, NULL);
+    engine_add_input(&en_lflow_output, &en_sb_dhcpv6_options, NULL);
+    engine_add_input(&en_lflow_output, &en_sb_dns, NULL);
+    engine_add_input(&en_lflow_output, &en_sb_load_balancer,
+                     lflow_output_sb_load_balancer_handler);
+    engine_add_input(&en_lflow_output, &en_sb_fdb,
+                     lflow_output_sb_fdb_handler);
 
     engine_add_input(&en_ct_zones, &en_ovs_open_vswitch, NULL);
     engine_add_input(&en_ct_zones, &en_ovs_bridge, NULL);
@@ -2810,6 +2783,11 @@  main(int argc, char *argv[])
     engine_add_input(&en_runtime_data, &en_ovs_interface,
                      runtime_data_ovs_interface_handler);
 
+    engine_add_input(&en_flow_output, &en_lflow_output,
+                     flow_output_lflow_output_handler);
+    engine_add_input(&en_flow_output, &en_pflow_output,
+                     flow_output_pflow_output_handler);
+
     struct engine_arg engine_arg = {
         .sb_idl = ovnsb_idl_loop.idl,
         .ovs_idl = ovs_idl_loop.idl,
@@ -2832,25 +2810,27 @@  main(int argc, char *argv[])
     engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
                                 sbrec_datapath_binding_by_key);
 
-    struct ed_type_flow_output *flow_output_data =
-        engine_get_internal_data(&en_flow_output);
+    struct ed_type_lflow_output *lflow_output_data =
+        engine_get_internal_data(&en_lflow_output);
+    struct ed_type_lflow_output *pflow_output_data =
+        engine_get_internal_data(&en_pflow_output);
     struct ed_type_ct_zones *ct_zones_data =
         engine_get_internal_data(&en_ct_zones);
     struct ed_type_runtime_data *runtime_data =
         engine_get_internal_data(&en_runtime_data);
 
-    ofctrl_init(&flow_output_data->group_table,
-                &flow_output_data->meter_table,
+    ofctrl_init(&lflow_output_data->group_table,
+                &lflow_output_data->meter_table,
                 get_ofctrl_probe_interval(ovs_idl_loop.idl));
     ofctrl_seqno_init();
 
     unixctl_command_register("group-table-list", "", 0, 0,
                              extend_table_list,
-                             &flow_output_data->group_table);
+                             &lflow_output_data->group_table);
 
     unixctl_command_register("meter-table-list", "", 0, 0,
                              extend_table_list,
-                             &flow_output_data->meter_table);
+                             &lflow_output_data->meter_table);
 
     unixctl_command_register("ct-zone-list", "", 0, 0,
                              ct_zone_list,
@@ -2864,14 +2844,14 @@  main(int argc, char *argv[])
                              NULL);
     unixctl_command_register("lflow-cache/flush", "", 0, 0,
                              lflow_cache_flush_cmd,
-                             &flow_output_data->pd);
+                             &lflow_output_data->pd);
     /* Keep deprecated 'flush-lflow-cache' command for now. */
     unixctl_command_register("flush-lflow-cache", "[deprecated]", 0, 0,
                              lflow_cache_flush_cmd,
-                             &flow_output_data->pd);
+                             &lflow_output_data->pd);
     unixctl_command_register("lflow-cache/show-stats", "", 0, 0,
                              lflow_cache_show_stats_cmd,
-                             &flow_output_data->pd);
+                             &lflow_output_data->pd);
 
     bool reset_ovnsb_idl_min_index = false;
     unixctl_command_register("sb-cluster-state-reset", "", 0, 0,
@@ -3117,13 +3097,17 @@  main(int argc, char *argv[])
                         runtime_data ? &runtime_data->lbinding_data : NULL;
                     if_status_mgr_update(if_mgr, binding_data);
 
-                    flow_output_data = engine_get_data(&en_flow_output);
-                    if (flow_output_data && ct_zones_data) {
-                        ofctrl_put(&flow_output_data->flow_table,
+                    lflow_output_data = engine_get_data(&en_lflow_output);
+                    pflow_output_data = engine_get_data(&en_pflow_output);
+                    if (lflow_output_data && pflow_output_data &&
+                        ct_zones_data) {
+                        ofctrl_put(&lflow_output_data->flow_table,
+                                   &pflow_output_data->flow_table,
                                    &ct_zones_data->pending,
                                    sbrec_meter_table_get(ovnsb_idl_loop.idl),
                                    ofctrl_seqno_get_req_cfg(),
-                                   engine_node_changed(&en_flow_output));
+                                   engine_node_changed(&en_lflow_output),
+                                   engine_node_changed(&en_pflow_output));
                     }
                     ofctrl_seqno_run(ofctrl_get_cur_cfg());
                     if_status_mgr_run(if_mgr, binding_data, !ovnsb_idl_txn,
@@ -3491,7 +3475,7 @@  lflow_cache_flush_cmd(struct unixctl_conn *conn OVS_UNUSED,
                       void *arg_)
 {
     VLOG_INFO("User triggered lflow cache flush.");
-    struct flow_output_persistent_data *fo_pd = arg_;
+    struct lflow_output_persistent_data *fo_pd = arg_;
     lflow_cache_flush(fo_pd->lflow_cache);
     fo_pd->conj_id_ofs = 1;
     engine_set_force_recompute(true);
@@ -3503,7 +3487,7 @@  static void
 lflow_cache_show_stats_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
                            const char *argv[] OVS_UNUSED, void *arg_)
 {
-    struct flow_output_persistent_data *fo_pd = arg_;
+    struct lflow_output_persistent_data *fo_pd = arg_;
     struct lflow_cache *lc = fo_pd->lflow_cache;
     struct ds ds = DS_EMPTY_INITIALIZER;
 
diff --git a/controller/physical.c b/controller/physical.c
index 018e09540c..04259d44a6 100644
--- a/controller/physical.c
+++ b/controller/physical.c
@@ -1953,22 +1953,3 @@  physical_clear_unassoc_flows_with_db(struct ovn_desired_flow_table *flow_table)
         ofctrl_remove_flows(flow_table, hc_uuid);
     }
 }
-
-void
-physical_clear_dp_flows(struct physical_ctx *p_ctx,
-                        struct hmapx *ct_updated_datapaths,
-                        struct ovn_desired_flow_table *flow_table)
-{
-    const struct sbrec_port_binding *binding;
-    SBREC_PORT_BINDING_TABLE_FOR_EACH (binding, p_ctx->port_binding_table) {
-        if (!hmapx_find(ct_updated_datapaths, binding->datapath)) {
-            continue;
-        }
-        const struct sbrec_port_binding *peer =
-            get_binding_peer(p_ctx->sbrec_port_binding_by_name, binding);
-        ofctrl_remove_flows(flow_table, &binding->header_.uuid);
-        if (peer) {
-            ofctrl_remove_flows(flow_table, &peer->header_.uuid);
-        }
-    }
-}
diff --git a/controller/physical.h b/controller/physical.h
index 0bf13f2683..feab41df4c 100644
--- a/controller/physical.h
+++ b/controller/physical.h
@@ -56,16 +56,12 @@  struct physical_ctx {
     const struct simap *ct_zones;
     enum mf_field_id mff_ovn_geneve;
     struct shash *local_bindings;
-    struct hmapx *ct_updated_datapaths;
 };
 
 void physical_register_ovs_idl(struct ovsdb_idl *);
 void physical_run(struct physical_ctx *,
                   struct ovn_desired_flow_table *);
 void physical_clear_unassoc_flows_with_db(struct ovn_desired_flow_table *);
-void physical_clear_dp_flows(struct physical_ctx *p_ctx,
-                             struct hmapx *ct_updated_datapaths,
-                             struct ovn_desired_flow_table *flow_table);
 void physical_handle_port_binding_changes(struct physical_ctx *,
                                           struct ovn_desired_flow_table *);
 void physical_handle_mc_group_changes(struct physical_ctx *,