diff mbox series

[ovs-dev,5/5] northd: Add incremental processing for NB port groups.

Message ID 169167151736.2447623.4735504724454800717.stgit@dceara.remote.csb
State Changes Requested
Headers show
Series Add port group incremental processing in northd. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test fail github build: failed
ovsrobot/github-robot-_ovn-kubernetes fail github build: failed

Commit Message

Dumitru Ceara Aug. 10, 2023, 12:45 p.m. UTC
It's similar to the processing we do for address sets.  There's a bit
more mechanics involved due to the fact that we need to split NB port
groups per datapath.

We currently only partially implement incremental processing of
port_group changes in the lflow node.  That is, we deal with the case
when the sets of "switches per port group" doesn't change.  In that
specific case ACL lflows don't need to be reprocessed.

In a synthetic benchmark that created (in this order):
- 500 switches
- 2000 port groups
- 4 ACLs per port group
- 10000 ports distributed equally between the switches and port groups

we measured the following ovn-northd CPU usage:

  +-------------------------+------------+--------------------+
  | Incremental processing? | --wait=sb? | northd avg cpu (%) |
  +-------------------------+------------+--------------------+
  |           N             |     Y      |        84.2        |
  +-------------------------+------------+--------------------+
  |           Y             |     Y      |        41.5        |
  +-------------------------+------------+--------------------+
  |           N             |     N      |        93.2        |
  +-------------------------+------------+--------------------+
  |           Y             |     N      |        53.6        |
  +-------------------------+------------+--------------------+

where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
port and port group operations to be propagated to the Southbound DB
before continuing to the next operation.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
Signed-off-by: Dumitru Ceara <dceara@redhat.com>
---
 northd/en-lflow.c        |   17 ++
 northd/en-lflow.h        |    1 
 northd/en-port-group.c   |  451 ++++++++++++++++++++++++++++++++++++++++------
 northd/en-port-group.h   |   36 +++-
 northd/inc-proc-northd.c |   13 +
 northd/ovn-northd.c      |    4 
 tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
 7 files changed, 708 insertions(+), 60 deletions(-)

Comments

Ales Musil Aug. 22, 2023, 6:58 a.m. UTC | #1
On Thu, Aug 10, 2023 at 2:45 PM Dumitru Ceara <dceara@redhat.com> wrote:

> It's similar to the processing we do for address sets.  There's a bit
> more mechanics involved due to the fact that we need to split NB port
> groups per datapath.
>
> We currently only partially implement incremental processing of
> port_group changes in the lflow node.  That is, we deal with the case
> when the sets of "switches per port group" doesn't change.  In that
> specific case ACL lflows don't need to be reprocessed.
>
> In a synthetic benchmark that created (in this order):
> - 500 switches
> - 2000 port groups
> - 4 ACLs per port group
> - 10000 ports distributed equally between the switches and port groups
>
> we measured the following ovn-northd CPU usage:
>
>   +-------------------------+------------+--------------------+
>   | Incremental processing? | --wait=sb? | northd avg cpu (%) |
>   +-------------------------+------------+--------------------+
>   |           N             |     Y      |        84.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     Y      |        41.5        |
>   +-------------------------+------------+--------------------+
>   |           N             |     N      |        93.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     N      |        53.6        |
>   +-------------------------+------------+--------------------+
>
> where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
> port and port group operations to be propagated to the Southbound DB
> before continuing to the next operation.
>
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> ---
>

Hi Dumitru,

I have a couple of comments down below.

 northd/en-lflow.c        |   17 ++
>  northd/en-lflow.h        |    1
>  northd/en-port-group.c   |  451
> ++++++++++++++++++++++++++++++++++++++++------
>  northd/en-port-group.h   |   36 +++-
>  northd/inc-proc-northd.c |   13 +
>  northd/ovn-northd.c      |    4
>  tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
>  7 files changed, 708 insertions(+), 60 deletions(-)
>
> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> index 7f6a7872b2..1321f79036 100644
> --- a/northd/en-lflow.c
> +++ b/northd/en-lflow.c
> @@ -119,6 +119,23 @@ lflow_northd_handler(struct engine_node *node,
>      return true;
>  }
>
> +bool
> +lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
> +{
> +    struct port_group_data *pg_data =
> +        engine_get_input_data("port_group", node);
> +
> +    /* If the set of switches per port group didn't change then there's no
> +     * need to reprocess lflows.  Otherwise, there might be a need to add
> +     * port-group ACLs to new switches. */
> +    if (!pg_data->ls_port_groups_sets_unchanged) {
> +        return false;
> +    }
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
>  void *en_lflow_init(struct engine_node *node OVS_UNUSED,
>                       struct engine_arg *arg OVS_UNUSED)
>  {
> diff --git a/northd/en-lflow.h b/northd/en-lflow.h
> index 5e3fbc25e3..5417b2faff 100644
> --- a/northd/en-lflow.h
> +++ b/northd/en-lflow.h
> @@ -13,5 +13,6 @@ void en_lflow_run(struct engine_node *node, void *data);
>  void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
>  void en_lflow_cleanup(void *data);
>  bool lflow_northd_handler(struct engine_node *, void *data);
> +bool lflow_port_group_handler(struct engine_node *, void *data);
>
>  #endif /* EN_LFLOW_H */
> diff --git a/northd/en-port-group.c b/northd/en-port-group.c
> index 2c36410246..6902695a01 100644
> --- a/northd/en-port-group.c
> +++ b/northd/en-port-group.c
> @@ -33,15 +33,46 @@ static struct ls_port_group *ls_port_group_create(
>  static void ls_port_group_destroy(struct ls_port_group_table *,
>                                    struct ls_port_group *);
>
> +static bool ls_port_group_process(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls_table *,
> +    const struct hmap *ls_ports,
> +    const struct nbrec_port_group *,
> +    struct hmapx *updated_ls_port_groups
> +);
> +
> +static void ls_port_group_record_clear(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls *,
> +    struct hmapx *updated_ls_port_groups);
> +static void ls_port_group_record_prune(struct ls_port_group *);
> +
>  static struct ls_port_group_record *ls_port_group_record_add(
>      struct ls_port_group *,
>      const struct nbrec_port_group *,
>      const char *port_name);
>
> +static struct ls_port_group_record *ls_port_group_record_find(
> +    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
> +
>  static void ls_port_group_record_destroy(
>      struct ls_port_group *,
>      struct ls_port_group_record *);
>
> +static struct port_group_to_ls *port_group_to_ls_create(
> +    struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
> +                                     struct port_group_to_ls *);
> +
> +static void update_sb_port_group(struct sorted_array *nb_ports,
> +                                 const struct sbrec_port_group *sb_pg);
> +static void sync_port_group(struct ovsdb_idl_txn *, const char
> *sb_pg_name,
> +                            struct sorted_array *ports,
> +                            struct shash *sb_port_groups);
> +static const struct sbrec_port_group *sb_port_group_lookup_by_name(
> +    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
> +
>  void
>  ls_port_group_table_init(struct ls_port_group_table *table)
>  {
> @@ -82,39 +113,16 @@ ls_port_group_table_find(const struct
> ls_port_group_table *table,
>  }
>
>  void
> -ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
> -                          const struct nbrec_port_group_table *pg_table,
> -                          const struct hmap *ls_ports)
> +ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,
> +    const struct nbrec_port_group_table *pg_table,
> +    const struct hmap *ls_ports)
>  {
>      const struct nbrec_port_group *nb_pg;
>      NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
> -        for (size_t i = 0; i < nb_pg->n_ports; i++) {
> -            const char *port_name = nb_pg->ports[i]->name;
> -            const struct ovn_datapath *od =
> -                northd_get_datapath_for_port(ls_ports, port_name);
> -
> -            if (!od) {
> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> -                            port_name, nb_pg->name);
> -                continue;
> -            }
> -
> -            if (!od->nbs) {
> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_WARN_RL(&rl, "lport %s in port group %s has no
> lswitch.",
> -                             nb_pg->ports[i]->name,
> -                             nb_pg->name);
> -                continue;
> -            }
> -
> -            struct ls_port_group *ls_pg =
> -                ls_port_group_table_find(ls_port_groups, od->nbs);
> -            if (!ls_pg) {
> -                ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
> od->sb);
> -            }
> -            ls_port_group_record_add(ls_pg, nb_pg, port_name);
> -        }
> +        ls_port_group_process(ls_port_groups, port_group_to_switches,
> +                              ls_ports, nb_pg, NULL);
>      }
>  }
>
> @@ -145,18 +153,11 @@ ls_port_group_table_sync(
>              get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>                                     ls_pg->sb_datapath_key,
>                                     &sb_name);
> -            sb_port_group = shash_find_and_delete(&sb_port_groups,
> -                                                  ds_cstr(&sb_name));
> -            if (!sb_port_group) {
> -                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> -                sbrec_port_group_set_name(sb_port_group,
> ds_cstr(&sb_name));
> -            }
> -
> -            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
> -            sbrec_port_group_set_ports(sb_port_group,
> -                                       nb_port_names,
> -                                       sset_count(&ls_pg_rec->ports));
> -            free(nb_port_names);
> +            struct sorted_array ports =
> +                sorted_array_from_sset(&ls_pg_rec->ports);
> +            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
> +                            &ports, &sb_port_groups);
> +            sorted_array_destroy(&ports);
>          }
>      }
>      ds_destroy(&sb_name);
> @@ -201,31 +202,165 @@ ls_port_group_destroy(struct ls_port_group_table
> *ls_port_groups,
>      }
>  }
>
> +/* Process a NB.Port_Group record and stores any updated ls_port_groups
> + * in updated_ls_port_groups.  Returns true if a new ls_port_group had
> + * to be created or destroyed.
> + */
> +static bool
> +ls_port_group_process(struct ls_port_group_table *ls_port_groups,
> +                      struct port_group_to_ls_table
> *port_group_to_switches,
> +                      const struct hmap *ls_ports,
> +                      const struct nbrec_port_group *nb_pg,
> +                      struct hmapx *updated_ls_port_groups)
> +{
> +    struct hmapx cleared_ls_port_groups =
> +        HMAPX_INITIALIZER(&cleared_ls_port_groups);
> +    bool ls_port_group_created = false;
> +
> +    struct port_group_to_ls *pg_ls =
> +        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
> +    if (!pg_ls) {
> +        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
> +    } else {
> +        /* Clear all old records corresponding to this port group; we'll
> +         * reprocess it below. */
> +        ls_port_group_record_clear(ls_port_groups, pg_ls,
> +                                   &cleared_ls_port_groups);
> +    }
> +
> +    for (size_t i = 0; i < nb_pg->n_ports; i++) {
> +        const char *port_name = nb_pg->ports[i]->name;
> +        const struct ovn_datapath *od =
> +            northd_get_datapath_for_port(ls_ports, port_name);
> +
> +        if (!od) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> +            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> +                        port_name, nb_pg->name);
> +            continue;
> +        }
> +
> +        if (!od->nbs) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> +            VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
> +                         nb_pg->ports[i]->name,
> +                         nb_pg->name);
> +            continue;
> +        }
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, od->nbs);
> +        if (!ls_pg) {
> +            ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
> +            ls_port_group_created = true;
> +        }
> +        ls_port_group_record_add(ls_pg, nb_pg, port_name);
> +        hmapx_add(&pg_ls->switches,
> +                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
> +        if (updated_ls_port_groups) {
> +            hmapx_add(updated_ls_port_groups, ls_pg);
> +        }
> +    }
> +
> +    bool ls_port_group_destroyed = false;
> +    struct hmapx_node *node;
> +    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
> +        struct ls_port_group *ls_pg = node->data;
> +
> +        ls_port_group_record_prune(ls_pg);
> +
> +        if (hmap_is_empty(&ls_pg->nb_pgs)) {
> +            ls_port_group_destroy(ls_port_groups, ls_pg);
> +            ls_port_group_destroyed = true;
> +        }
> +    }
> +    hmapx_destroy(&cleared_ls_port_groups);
> +
> +    return ls_port_group_created || ls_port_group_destroyed;
> +}
> +
> +/* Destroys all the struct ls_port_group_record that might be associated
> to
> + * northbound database logical switches.  Stores ls_port_groups that
> became
> + * were updated in the 'updated_ls_port_groups' map.
> + */
> +static void
> +ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
> +                           struct port_group_to_ls *pg_ls,
> +                           struct hmapx *cleared_ls_port_groups)
> +{
> +    struct hmapx_node *node;
> +
> +    HMAPX_FOR_EACH (node, &pg_ls->switches) {
> +        const struct nbrec_logical_switch *nbs = node->data;
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, nbs);
> +        if (!ls_pg) {
> +            continue;
> +        }
> +
> +        /* Clear ports in the port group record. */
> +        struct ls_port_group_record *ls_pg_rec =
> +            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
> +        if (!ls_pg_rec) {
> +            continue;
> +        }
> +
> +        sset_clear(&ls_pg_rec->ports);
> +        hmapx_add(cleared_ls_port_groups, ls_pg);
> +    }
> +}
> +
> +static void
> +ls_port_group_record_prune(struct ls_port_group *ls_pg)
> +{
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +        if (sset_is_empty(&ls_pg_rec->ports)) {
> +            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
> +        }
> +    }
> +}
> +
>  static struct ls_port_group_record *
>  ls_port_group_record_add(struct ls_port_group *ls_pg,
>                           const struct nbrec_port_group *nb_pg,
>                           const char *port_name)
>  {
> -    struct ls_port_group_record *ls_pg_rec = NULL;
> +    struct ls_port_group_record *ls_pg_rec =
> +        ls_port_group_record_find(ls_pg, nb_pg);
>      size_t hash = uuid_hash(&nb_pg->header_.uuid);
>
> -    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> -        if (ls_pg_rec->nb_pg == nb_pg) {
> -            goto done;
> -        }
> +    if (!ls_pg_rec) {
> +        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>

nit: No need for zeroed alloc as all the fields are immediately overwritten.

+        *ls_pg_rec = (struct ls_port_group_record) {
> +            .nb_pg = nb_pg,
> +            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> +        };
> +        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>      }
>
> -    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
> -    *ls_pg_rec = (struct ls_port_group_record) {
> -        .nb_pg = nb_pg,
> -        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> -    };
> -    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
> -done:
>      sset_add(&ls_pg_rec->ports, port_name);
>      return ls_pg_rec;
>  }
>
> +static struct ls_port_group_record *
> +ls_port_group_record_find(struct ls_port_group *ls_pg,
> +                          const struct nbrec_port_group *nb_pg)
> +{
> +    size_t hash = uuid_hash(&nb_pg->header_.uuid);
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> +        if (ls_pg_rec->nb_pg == nb_pg) {
> +            return ls_pg_rec;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +
>  static void
>  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
>                               struct ls_port_group_record *ls_pg_rec)
> @@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group
> *ls_pg,
>      }
>  }
>
> +void
> +port_group_to_ls_table_init(struct port_group_to_ls_table *table)
> +{
> +    *table = (struct port_group_to_ls_table) {
> +        .entries = HMAP_INITIALIZER(&table->entries),
> +    };
> +}
> +
> +void
> +port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
> +{
> +    struct port_group_to_ls *pg_ls;
> +    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
> +        port_group_to_ls_destroy(table, pg_ls);
> +    }
> +}
> +
> +void
> +port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
> +{
> +    port_group_to_ls_table_clear(table);
> +    hmap_destroy(&table->entries);
> +}
> +
> +struct port_group_to_ls *
> +port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
> +                            const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls;
> +
> +    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node,
> uuid_hash(&nb_pg->header_.uuid),
> +                             &table->entries) {
>

We should move the uuid_hash call outside the loop.


> +        if (nb_pg == pg_ls->nb_pg) {
> +            return pg_ls;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static struct port_group_to_ls *
> +port_group_to_ls_create(struct port_group_to_ls_table *table,
> +                        const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
> +
> +    *pg_ls = (struct port_group_to_ls) {
> +        .nb_pg = nb_pg,
> +        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
> +    };
> +    hmap_insert(&table->entries, &pg_ls->key_node,
> +                uuid_hash(&nb_pg->header_.uuid));
> +    return pg_ls;
> +}
> +
> +static void
> +port_group_to_ls_destroy(struct port_group_to_ls_table *table,
> +                         struct port_group_to_ls *pg_ls)
> +{
> +    if (pg_ls) {
> +        hmapx_destroy(&pg_ls->switches);
> +        hmap_remove(&table->entries, &pg_ls->key_node);
> +        free(pg_ls);
> +    }
> +}
> +
>  /* Incremental processing implementation. */
>  static struct port_group_input
>  port_group_get_input_data(struct engine_node *node)
> @@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node OVS_UNUSED,
>      struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
>
>      ls_port_group_table_init(&pg_data->ls_port_groups);
> +    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
>      return pg_data;
>  }
>
> @@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
>      struct port_group_data *data = data_;
>
>      ls_port_group_table_destroy(&data->ls_port_groups);
> +    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
> +}
> +
> +void
> +en_port_group_clear_tracked_data(void *data_)
> +{
> +    struct port_group_data *data = data_;
> +
> +    data->ls_port_groups_sets_unchanged = false;
>  }
>
>  void
> @@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>      stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>
>      ls_port_group_table_clear(&data->ls_port_groups);
> +    port_group_to_ls_table_clear(&data->port_groups_to_ls);
> +
>      ls_port_group_table_build(&data->ls_port_groups,
> +                              &data->port_groups_to_ls,
>                                input_data.nbrec_port_group_table,
>                                input_data.ls_ports);
>
> @@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>      stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>      engine_set_node_state(node, EN_UPDATED);
>  }
> +
> +bool
> +port_group_nb_port_group_handler(struct engine_node *node, void *data_)
> +{
> +    struct port_group_input input_data = port_group_get_input_data(node);
> +    struct port_group_data *data = data_;
> +    bool success = true;
> +
> +    const struct nbrec_port_group_table *nb_pg_table =
> +        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Return false if a port group is created or deleted.
> +     * Handle I-P for only updated port groups. */
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        if (nbrec_port_group_is_new(nb_pg) ||
> +                nbrec_port_group_is_deleted(nb_pg)) {
> +            return false;
> +        }
> +    }
> +
> +    struct hmapx updated_ls_port_groups =
> +        HMAPX_INITIALIZER(&updated_ls_port_groups);
> +
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        /* Newly created port groups can't be incrementally processed;
> +         * the rest yes. */
> +        if (ls_port_group_process(&data->ls_port_groups,
> +                                  &data->port_groups_to_ls,
> +                                  input_data.ls_ports,
> +                                  nb_pg, &updated_ls_port_groups)) {
> +            success = false;
> +            break;
> +        }
> +    }
> +
> +    /* If changes have been successfully processed incrementally then
> update
> +     * the SB too. */
> +    if (success) {
> +        struct ovsdb_idl_index *sbrec_port_group_by_name =
> +            engine_ovsdb_node_get_index(
> +                    engine_get_input("SB_port_group", node),
> +                    "sbrec_port_group_by_name");
> +        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
> +
> +        struct hmapx_node *updated_node;
> +        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
> +            const struct ls_port_group *ls_pg = updated_node->data;
> +            struct ls_port_group_record *ls_pg_rec;
> +
> +            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
> +                                        ls_pg->sb_datapath_key,
> +                                        &sb_pg_name);
> +
> +                const struct sbrec_port_group *sb_pg =
> +                    sb_port_group_lookup_by_name(sbrec_port_group_by_name,
> +                                                 ds_cstr(&sb_pg_name));
> +                if (!sb_pg) {
> +                    success = false;
> +                    break;
> +                }
> +                struct sorted_array nb_ports =
> +                    sorted_array_from_sset(&ls_pg_rec->ports);
> +                update_sb_port_group(&nb_ports, sb_pg);
> +                sorted_array_destroy(&nb_ports);
> +            }
> +        }
> +        ds_destroy(&sb_pg_name);
> +    }
> +
> +    data->ls_port_groups_sets_unchanged = success;
> +    engine_set_node_state(node, EN_UPDATED);
> +    hmapx_destroy(&updated_ls_port_groups);
> +    return success;
> +}
> +
> +static void
> +sb_port_group_apply_diff(const void *arg, const char *item, bool add)
> +{
> +    const struct sbrec_port_group *pg = arg;
> +    if (add) {
> +        sbrec_port_group_update_ports_addvalue(pg, item);
> +    } else {
> +        sbrec_port_group_update_ports_delvalue(pg, item);
> +    }
> +}
> +
> +static void
> +update_sb_port_group(struct sorted_array *nb_ports,
> +                     const struct sbrec_port_group *sb_pg)
> +{
> +    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
> +    sorted_array_apply_diff(nb_ports, &sb_ports,
> +                            sb_port_group_apply_diff, sb_pg);
> +    sorted_array_destroy(&sb_ports);
> +}
> +
> +static void
> +sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
> +                struct sorted_array *ports,
> +                struct shash *sb_port_groups)
> +{
> +    const struct sbrec_port_group *sb_port_group =
> +        shash_find_and_delete(sb_port_groups, sb_pg_name);
> +    if (!sb_port_group) {
> +        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> +        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
> +        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
> +    } else {
> +        update_sb_port_group(ports, sb_port_group);
> +    }
> +}
> +
> +/* Finds and returns the port group set with the given 'name', or NULL
> + * if no such port group exists. */
> +static const struct sbrec_port_group *
> +sb_port_group_lookup_by_name(struct ovsdb_idl_index
> *sbrec_port_group_by_name,
> +                             const char *name)
> +{
> +    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
> +        sbrec_port_group_by_name);
> +    sbrec_port_group_index_set_name(target, name);
> +
> +    struct sbrec_port_group *retval = sbrec_port_group_index_find(
> +        sbrec_port_group_by_name, target);
> +
> +    sbrec_port_group_index_destroy_row(target);
> +    return retval;
> +}
> diff --git a/northd/en-port-group.h b/northd/en-port-group.h
> index 5cbf6c6c4a..c3975f64ee 100644
> --- a/northd/en-port-group.h
> +++ b/northd/en-port-group.h
> @@ -18,6 +18,7 @@
>
>  #include <stdint.h>
>
> +#include "lib/hmapx.h"
>  #include "lib/inc-proc-eng.h"
>  #include "lib/ovn-nb-idl.h"
>  #include "lib/ovn-sb-idl.h"
> @@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
>      const struct ls_port_group_table *,
>      const struct nbrec_logical_switch *);
>
> -void ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
> -                               const struct nbrec_port_group_table *,
> -                               const struct hmap *ls_ports);
> +/* Per port group map of datapaths with ports in the group. */
> +struct port_group_to_ls_table {
> +    struct hmap entries; /* Stores struct port_group_to_ls. */
> +};
> +
> +struct port_group_to_ls {
> +    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
> +
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
> +    struct hmapx switches;
> +};
> +
> +void port_group_to_ls_table_init(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
> +
> +struct port_group_to_ls *port_group_to_ls_table_find(
> +    const struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +
> +void ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,
> +    const struct nbrec_port_group_table *,
> +    const struct hmap *ls_ports);
>  void ls_port_group_table_sync(const struct ls_port_group_table
> *ls_port_groups,
>                                const struct sbrec_port_group_table *,
>                                struct ovsdb_idl_txn *ovnsb_txn);
> @@ -75,10 +100,15 @@ struct port_group_input {
>
>  struct port_group_data {
>      struct ls_port_group_table ls_port_groups;
> +    struct port_group_to_ls_table port_groups_to_ls;
> +    bool ls_port_groups_sets_unchanged;
>  };
>
>  void *en_port_group_init(struct engine_node *, struct engine_arg *);
>  void en_port_group_cleanup(void *data);
> +void en_port_group_clear_tracked_data(void *data);
>  void en_port_group_run(struct engine_node *, void *data);
>
> +bool port_group_nb_port_group_handler(struct engine_node *, void *data);
> +
>  #endif /* EN_PORT_GROUP_H */
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index 6d5f9e8d16..bd598ba5e2 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker,
> "mac_binding_aging_waker");
>  static ENGINE_NODE(northd_output, "northd_output");
>  static ENGINE_NODE(sync_to_sb, "sync_to_sb");
>  static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
> -static ENGINE_NODE(port_group, "port_group");
> +static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
>  static ENGINE_NODE(fdb_aging, "fdb_aging");
>  static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
>
> @@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
>      engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>      engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
> -    engine_add_input(&en_lflow, &en_port_group, NULL);
> +    engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
>
>      engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
>                       sync_to_sb_addr_set_nb_address_set_handler);
> @@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>      engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
>
> -    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
> +    engine_add_input(&en_port_group, &en_nb_port_group,
> +                     port_group_nb_port_group_handler);
>      engine_add_input(&en_port_group, &en_sb_port_group, NULL);
>      /* No need for an explicit handler for northd changes.  Port changes
>       * that affect port_groups trigger updates to the NB.Port_Group
> @@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>                                  "sbrec_address_set_by_name",
>                                  sbrec_address_set_by_name);
>
> +    struct ovsdb_idl_index *sbrec_port_group_by_name
> +        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
> +    engine_ovsdb_node_add_index(&en_sb_port_group,
> +                                "sbrec_port_group_by_name",
> +                                sbrec_port_group_by_name);
> +
>      struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
>          = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
>                                    &sbrec_fdb_col_port_key);
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 4fa1b039ea..44385d604c 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -836,6 +836,10 @@ main(int argc, char *argv[])
>          ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>                               &sbrec_multicast_group_columns[i]);
>      }
> +    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
> +        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
> +                             &sbrec_port_group_columns[i]);
> +    }
>
>      unixctl_command_register("sb-connection-status", "", 0, 0,
>                               ovn_conn_show, ovnsb_idl_loop.idl);
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 1a12513d7a..a04ba2b23f 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE
> inc-engine/show-stats sync_to_sb_a
>  AT_CLEANUP
>  ])
>
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([Port group incremental processing])
> +ovn_start
> +
> +check ovn-nbctl ls-add sw1 \
> +  -- lsp-add sw1 sw1.1     \
> +  -- lsp-add sw1 sw1.2     \
> +  -- lsp-add sw1 sw1.3     \
> +  -- ls-add sw2            \
> +  -- lsp-add sw2 sw2.1     \
> +  -- lsp-add sw2 sw2.2     \
> +  -- lsp-add sw2 sw2.3
> +
> +check ovn-nbctl --wait=sb sync
> +sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
> +sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
> +
> +check_acl_lflows() {
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$1
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$2
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$3
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$4
> +])
> +}
> +
> +AS_BOX([Create new PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes every time a NB port group is
> added/deleted.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +
> +AS_BOX([Add ACLs on PG1 and PG2])
> +check ovn-nbctl --wait=sb             \
> +  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
> +  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
> +
> +AS_BOX([Add one port from the two switches to PG1])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
> switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect ACL1 on sw1 and sw2
> +check_acl_lflows 1 0 1 0
> +
> +AS_BOX([Add one port from the two switches to PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
> switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Add one more port from the two switches to PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb                     \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
> +check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the last port from PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the second port from PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 \
> +  -- pg-set-ports pg2 sw1.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should be
> +dnl a recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should be
> +dnl a recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and not on sw2.
> +check_acl_lflows 1 1 0 0
> +
> +AT_CLEANUP
> +])
> +
>  OVN_FOR_EACH_NORTHD([
>  AT_SETUP([Check default drop])
>  AT_KEYWORDS([drop])
>
>
With that addressed:

Acked-by: Ales Musil <amusil@redhat.com>
Han Zhou Aug. 24, 2023, 6:18 a.m. UTC | #2
On Thu, Aug 10, 2023 at 5:45 AM Dumitru Ceara <dceara@redhat.com> wrote:
>
> It's similar to the processing we do for address sets.  There's a bit
> more mechanics involved due to the fact that we need to split NB port
> groups per datapath.
>
> We currently only partially implement incremental processing of
> port_group changes in the lflow node.  That is, we deal with the case
> when the sets of "switches per port group" doesn't change.  In that
> specific case ACL lflows don't need to be reprocessed.
>
> In a synthetic benchmark that created (in this order):
> - 500 switches
> - 2000 port groups
> - 4 ACLs per port group
> - 10000 ports distributed equally between the switches and port groups
>
> we measured the following ovn-northd CPU usage:
>
>   +-------------------------+------------+--------------------+
>   | Incremental processing? | --wait=sb? | northd avg cpu (%) |
>   +-------------------------+------------+--------------------+
>   |           N             |     Y      |        84.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     Y      |        41.5        |
>   +-------------------------+------------+--------------------+
>   |           N             |     N      |        93.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     N      |        53.6        |
>   +-------------------------+------------+--------------------+
>
> where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
> port and port group operations to be propagated to the Southbound DB
> before continuing to the next operation.
>
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
> Signed-off-by: Dumitru Ceara <dceara@redhat.com>

Thanks Dumitru for the improvement! Please see my comments below.

> ---
>  northd/en-lflow.c        |   17 ++
>  northd/en-lflow.h        |    1
>  northd/en-port-group.c   |  451
++++++++++++++++++++++++++++++++++++++++------
>  northd/en-port-group.h   |   36 +++-
>  northd/inc-proc-northd.c |   13 +
>  northd/ovn-northd.c      |    4
>  tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
>  7 files changed, 708 insertions(+), 60 deletions(-)
>
> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> index 7f6a7872b2..1321f79036 100644
> --- a/northd/en-lflow.c
> +++ b/northd/en-lflow.c
> @@ -119,6 +119,23 @@ lflow_northd_handler(struct engine_node *node,
>      return true;
>  }
>
> +bool
> +lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
> +{
> +    struct port_group_data *pg_data =
> +        engine_get_input_data("port_group", node);
> +
> +    /* If the set of switches per port group didn't change then there's
no
> +     * need to reprocess lflows.  Otherwise, there might be a need to add
> +     * port-group ACLs to new switches. */

To be more accurate, the comment should say "there might be a need to
add/delete port-group ACLs to/from switches."

> +    if (!pg_data->ls_port_groups_sets_unchanged) {

nit: it would be a little more natural to name the field as
ls_port_groups_sets_changed and use positive check in the if condition.

> +        return false;
> +    }
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
>  void *en_lflow_init(struct engine_node *node OVS_UNUSED,
>                       struct engine_arg *arg OVS_UNUSED)
>  {
> diff --git a/northd/en-lflow.h b/northd/en-lflow.h
> index 5e3fbc25e3..5417b2faff 100644
> --- a/northd/en-lflow.h
> +++ b/northd/en-lflow.h
> @@ -13,5 +13,6 @@ void en_lflow_run(struct engine_node *node, void *data);
>  void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
>  void en_lflow_cleanup(void *data);
>  bool lflow_northd_handler(struct engine_node *, void *data);
> +bool lflow_port_group_handler(struct engine_node *, void *data);
>
>  #endif /* EN_LFLOW_H */
> diff --git a/northd/en-port-group.c b/northd/en-port-group.c
> index 2c36410246..6902695a01 100644
> --- a/northd/en-port-group.c
> +++ b/northd/en-port-group.c
> @@ -33,15 +33,46 @@ static struct ls_port_group *ls_port_group_create(
>  static void ls_port_group_destroy(struct ls_port_group_table *,
>                                    struct ls_port_group *);
>
> +static bool ls_port_group_process(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls_table *,
> +    const struct hmap *ls_ports,
> +    const struct nbrec_port_group *,
> +    struct hmapx *updated_ls_port_groups
> +);

nit: It would be better to use the same coding style as the below
prototype, i.e. don't put the last ");" in a separate line.

> +
> +static void ls_port_group_record_clear(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls *,
> +    struct hmapx *updated_ls_port_groups);
> +static void ls_port_group_record_prune(struct ls_port_group *);
> +
>  static struct ls_port_group_record *ls_port_group_record_add(
>      struct ls_port_group *,
>      const struct nbrec_port_group *,
>      const char *port_name);
>
> +static struct ls_port_group_record *ls_port_group_record_find(
> +    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
> +
>  static void ls_port_group_record_destroy(
>      struct ls_port_group *,
>      struct ls_port_group_record *);
>
> +static struct port_group_to_ls *port_group_to_ls_create(
> +    struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
> +                                     struct port_group_to_ls *);
> +
> +static void update_sb_port_group(struct sorted_array *nb_ports,
> +                                 const struct sbrec_port_group *sb_pg);
> +static void sync_port_group(struct ovsdb_idl_txn *, const char
*sb_pg_name,
> +                            struct sorted_array *ports,
> +                            struct shash *sb_port_groups);
> +static const struct sbrec_port_group *sb_port_group_lookup_by_name(
> +    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
> +
>  void
>  ls_port_group_table_init(struct ls_port_group_table *table)
>  {
> @@ -82,39 +113,16 @@ ls_port_group_table_find(const struct
ls_port_group_table *table,
>  }
>
>  void
> -ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
> -                          const struct nbrec_port_group_table *pg_table,
> -                          const struct hmap *ls_ports)
> +ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,
> +    const struct nbrec_port_group_table *pg_table,
> +    const struct hmap *ls_ports)
>  {
>      const struct nbrec_port_group *nb_pg;
>      NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
> -        for (size_t i = 0; i < nb_pg->n_ports; i++) {
> -            const char *port_name = nb_pg->ports[i]->name;
> -            const struct ovn_datapath *od =
> -                northd_get_datapath_for_port(ls_ports, port_name);
> -
> -            if (!od) {
> -                static struct vlog_rate_limit rl =
VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> -                            port_name, nb_pg->name);
> -                continue;
> -            }
> -
> -            if (!od->nbs) {
> -                static struct vlog_rate_limit rl =
VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_WARN_RL(&rl, "lport %s in port group %s has no
lswitch.",
> -                             nb_pg->ports[i]->name,
> -                             nb_pg->name);
> -                continue;
> -            }
> -
> -            struct ls_port_group *ls_pg =
> -                ls_port_group_table_find(ls_port_groups, od->nbs);
> -            if (!ls_pg) {
> -                ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
od->sb);
> -            }
> -            ls_port_group_record_add(ls_pg, nb_pg, port_name);
> -        }
> +        ls_port_group_process(ls_port_groups, port_group_to_switches,
> +                              ls_ports, nb_pg, NULL);
>      }
>  }
>
> @@ -145,18 +153,11 @@ ls_port_group_table_sync(
>              get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>                                     ls_pg->sb_datapath_key,
>                                     &sb_name);
> -            sb_port_group = shash_find_and_delete(&sb_port_groups,
> -                                                  ds_cstr(&sb_name));
> -            if (!sb_port_group) {
> -                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> -                sbrec_port_group_set_name(sb_port_group,
ds_cstr(&sb_name));
> -            }
> -
> -            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
> -            sbrec_port_group_set_ports(sb_port_group,
> -                                       nb_port_names,
> -                                       sset_count(&ls_pg_rec->ports));
> -            free(nb_port_names);
> +            struct sorted_array ports =
> +                sorted_array_from_sset(&ls_pg_rec->ports);
> +            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
> +                            &ports, &sb_port_groups);
> +            sorted_array_destroy(&ports);
>          }
>      }
>      ds_destroy(&sb_name);
> @@ -201,31 +202,165 @@ ls_port_group_destroy(struct ls_port_group_table
*ls_port_groups,
>      }
>  }
>
> +/* Process a NB.Port_Group record and stores any updated ls_port_groups
> + * in updated_ls_port_groups.  Returns true if a new ls_port_group had
> + * to be created or destroyed.
> + */
> +static bool
> +ls_port_group_process(struct ls_port_group_table *ls_port_groups,
> +                      struct port_group_to_ls_table
*port_group_to_switches,
> +                      const struct hmap *ls_ports,
> +                      const struct nbrec_port_group *nb_pg,
> +                      struct hmapx *updated_ls_port_groups)
> +{
> +    struct hmapx cleared_ls_port_groups =
> +        HMAPX_INITIALIZER(&cleared_ls_port_groups);
> +    bool ls_port_group_created = false;
> +
> +    struct port_group_to_ls *pg_ls =
> +        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
> +    if (!pg_ls) {
> +        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
> +    } else {
> +        /* Clear all old records corresponding to this port group; we'll
> +         * reprocess it below. */
> +        ls_port_group_record_clear(ls_port_groups, pg_ls,
> +                                   &cleared_ls_port_groups);

When the last port from a LS is removed from a PG, we should remove the LS
from the port_group_to_switches record's "switches" field. I didn't find
this logic in the code. Did I miss anything?

> +    }
> +
> +    for (size_t i = 0; i < nb_pg->n_ports; i++) {
> +        const char *port_name = nb_pg->ports[i]->name;
> +        const struct ovn_datapath *od =
> +            northd_get_datapath_for_port(ls_ports, port_name);
> +
> +        if (!od) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1,
1);
> +            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> +                        port_name, nb_pg->name);
> +            continue;
> +        }
> +
> +        if (!od->nbs) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1,
1);
> +            VLOG_WARN_RL(&rl, "lport %s in port group %s has no
lswitch.",
> +                         nb_pg->ports[i]->name,
> +                         nb_pg->name);
> +            continue;
> +        }
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, od->nbs);
> +        if (!ls_pg) {
> +            ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
od->sb);
> +            ls_port_group_created = true;

What if a ls_pg has existed, and now a new PG is associated with the LS? I
think we should identify such cases, too, and return false.

> +        }
> +        ls_port_group_record_add(ls_pg, nb_pg, port_name);
> +        hmapx_add(&pg_ls->switches,
> +                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
> +        if (updated_ls_port_groups) {
> +            hmapx_add(updated_ls_port_groups, ls_pg);
> +        }
> +    }
> +
> +    bool ls_port_group_destroyed = false;
> +    struct hmapx_node *node;
> +    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
> +        struct ls_port_group *ls_pg = node->data;
> +
> +        ls_port_group_record_prune(ls_pg);
> +
> +        if (hmap_is_empty(&ls_pg->nb_pgs)) {
> +            ls_port_group_destroy(ls_port_groups, ls_pg);
> +            ls_port_group_destroyed = true;

Similar to the above, what if a PG is removed from a ls_pg but the ls_pg
still has some other PGs? Should we return false, too?

> +        }
> +    }
> +    hmapx_destroy(&cleared_ls_port_groups);
> +
> +    return ls_port_group_created || ls_port_group_destroyed;
> +}
> +
> +/* Destroys all the struct ls_port_group_record that might be associated
to
> + * northbound database logical switches.  Stores ls_port_groups that
became
> + * were updated in the 'updated_ls_port_groups' map.

nit:
s/became were/were
s/updated/cleared

> + */
> +static void
> +ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
> +                           struct port_group_to_ls *pg_ls,
> +                           struct hmapx *cleared_ls_port_groups)
> +{
> +    struct hmapx_node *node;
> +
> +    HMAPX_FOR_EACH (node, &pg_ls->switches) {
> +        const struct nbrec_logical_switch *nbs = node->data;
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, nbs);
> +        if (!ls_pg) {

Should it be a bug if the ls_pg is not found here? Shall we assert?

> +            continue;
> +        }
> +
> +        /* Clear ports in the port group record. */
> +        struct ls_port_group_record *ls_pg_rec =
> +            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
> +        if (!ls_pg_rec) {

Same as above.

> +            continue;
> +        }
> +
> +        sset_clear(&ls_pg_rec->ports);
> +        hmapx_add(cleared_ls_port_groups, ls_pg);
> +    }
> +}
> +
> +static void
> +ls_port_group_record_prune(struct ls_port_group *ls_pg)
> +{
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +        if (sset_is_empty(&ls_pg_rec->ports)) {
> +            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
> +        }
> +    }
> +}
> +
>  static struct ls_port_group_record *
>  ls_port_group_record_add(struct ls_port_group *ls_pg,
>                           const struct nbrec_port_group *nb_pg,
>                           const char *port_name)
>  {
> -    struct ls_port_group_record *ls_pg_rec = NULL;
> +    struct ls_port_group_record *ls_pg_rec =
> +        ls_port_group_record_find(ls_pg, nb_pg);
>      size_t hash = uuid_hash(&nb_pg->header_.uuid);
>
> -    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> -        if (ls_pg_rec->nb_pg == nb_pg) {
> -            goto done;
> -        }
> +    if (!ls_pg_rec) {
> +        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
> +        *ls_pg_rec = (struct ls_port_group_record) {
> +            .nb_pg = nb_pg,
> +            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> +        };
> +        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>      }
>
> -    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
> -    *ls_pg_rec = (struct ls_port_group_record) {
> -        .nb_pg = nb_pg,
> -        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> -    };
> -    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
> -done:
>      sset_add(&ls_pg_rec->ports, port_name);
>      return ls_pg_rec;
>  }
>
> +static struct ls_port_group_record *
> +ls_port_group_record_find(struct ls_port_group *ls_pg,
> +                          const struct nbrec_port_group *nb_pg)
> +{
> +    size_t hash = uuid_hash(&nb_pg->header_.uuid);
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> +        if (ls_pg_rec->nb_pg == nb_pg) {
> +            return ls_pg_rec;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +
>  static void
>  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
>                               struct ls_port_group_record *ls_pg_rec)
> @@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group
*ls_pg,
>      }
>  }
>
> +void
> +port_group_to_ls_table_init(struct port_group_to_ls_table *table)
> +{
> +    *table = (struct port_group_to_ls_table) {
> +        .entries = HMAP_INITIALIZER(&table->entries),
> +    };
> +}
> +
> +void
> +port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
> +{
> +    struct port_group_to_ls *pg_ls;
> +    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
> +        port_group_to_ls_destroy(table, pg_ls);
> +    }
> +}
> +
> +void
> +port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
> +{
> +    port_group_to_ls_table_clear(table);
> +    hmap_destroy(&table->entries);
> +}
> +
> +struct port_group_to_ls *
> +port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
> +                            const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls;
> +
> +    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node,
uuid_hash(&nb_pg->header_.uuid),
> +                             &table->entries) {
> +        if (nb_pg == pg_ls->nb_pg) {
> +            return pg_ls;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static struct port_group_to_ls *
> +port_group_to_ls_create(struct port_group_to_ls_table *table,
> +                        const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
> +
> +    *pg_ls = (struct port_group_to_ls) {
> +        .nb_pg = nb_pg,
> +        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
> +    };
> +    hmap_insert(&table->entries, &pg_ls->key_node,
> +                uuid_hash(&nb_pg->header_.uuid));
> +    return pg_ls;
> +}
> +
> +static void
> +port_group_to_ls_destroy(struct port_group_to_ls_table *table,
> +                         struct port_group_to_ls *pg_ls)
> +{
> +    if (pg_ls) {
> +        hmapx_destroy(&pg_ls->switches);
> +        hmap_remove(&table->entries, &pg_ls->key_node);
> +        free(pg_ls);
> +    }
> +}
> +
>  /* Incremental processing implementation. */
>  static struct port_group_input
>  port_group_get_input_data(struct engine_node *node)
> @@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node
OVS_UNUSED,
>      struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
>
>      ls_port_group_table_init(&pg_data->ls_port_groups);
> +    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
>      return pg_data;
>  }
>
> @@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
>      struct port_group_data *data = data_;
>
>      ls_port_group_table_destroy(&data->ls_port_groups);
> +    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
> +}
> +
> +void
> +en_port_group_clear_tracked_data(void *data_)
> +{
> +    struct port_group_data *data = data_;
> +
> +    data->ls_port_groups_sets_unchanged = false;
>  }
>
>  void
> @@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void
*data_)
>      stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>
>      ls_port_group_table_clear(&data->ls_port_groups);
> +    port_group_to_ls_table_clear(&data->port_groups_to_ls);
> +
>      ls_port_group_table_build(&data->ls_port_groups,
> +                              &data->port_groups_to_ls,
>                                input_data.nbrec_port_group_table,
>                                input_data.ls_ports);
>
> @@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void
*data_)
>      stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>      engine_set_node_state(node, EN_UPDATED);
>  }
> +
> +bool
> +port_group_nb_port_group_handler(struct engine_node *node, void *data_)
> +{
> +    struct port_group_input input_data = port_group_get_input_data(node);
> +    struct port_group_data *data = data_;
> +    bool success = true;
> +
> +    const struct nbrec_port_group_table *nb_pg_table =
> +        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Return false if a port group is created or deleted.
> +     * Handle I-P for only updated port groups. */
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        if (nbrec_port_group_is_new(nb_pg) ||
> +                nbrec_port_group_is_deleted(nb_pg)) {
> +            return false;
> +        }
> +    }
> +
> +    struct hmapx updated_ls_port_groups =
> +        HMAPX_INITIALIZER(&updated_ls_port_groups);
> +
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        /* Newly created port groups can't be incrementally processed;
> +         * the rest yes. */

The comment here is incorrect and irrelevant, because apparently newly
created port groups and deleted port groups are both excluded in the
previous loop.

> +        if (ls_port_group_process(&data->ls_port_groups,
> +                                  &data->port_groups_to_ls,
> +                                  input_data.ls_ports,
> +                                  nb_pg, &updated_ls_port_groups)) {
> +            success = false;
> +            break;

Related to my comments above in the ls_port_group_process, I think the goal
here is to tell if there are PG changes that would impact ACL related
lflows in any logical switches. The current ls_port_group_process returns
true only if ls_pg is created or destroyed, but didn't detect the cases
when:
1. a new PG is added to an existing ls_pg, or
2. when a PG is deleted from a ls_pg but the ls_pg still contains other PGs
and so was not destroyed.

For these cases, we should set data->ls_port_groups_sets_unchanged to
false, so that in lflow node we can fallback to compute.
Case 1 still works in current implementation, but it is because in the
below code when updating SB it would find that the SB PG for the new ls_pg
doesn't exist and it fallback to recompute (while the SB updating logic
could actually incrementally create the PG in SB - see comment below).
Case 2 doesn't work with the current implementation, I think. And the
scenario is in fact not covered in the test case.

> +        }
> +    }
> +
> +    /* If changes have been successfully processed incrementally then
update
> +     * the SB too. */
> +    if (success) {

If ls_port_group_process returns false, it only means that there may be ACL
related flows that need to be updated for the related logical switches, but
the port_group node still can be incrementally processed. The
data->ls_port_groups_sets_unchanged == false is sufficient to indicate that
the lflow node needs to recompute.

> +        struct ovsdb_idl_index *sbrec_port_group_by_name =
> +            engine_ovsdb_node_get_index(
> +                    engine_get_input("SB_port_group", node),
> +                    "sbrec_port_group_by_name");
> +        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
> +
> +        struct hmapx_node *updated_node;
> +        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
> +            const struct ls_port_group *ls_pg = updated_node->data;
> +            struct ls_port_group_record *ls_pg_rec;
> +
> +            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
> +                                        ls_pg->sb_datapath_key,
> +                                        &sb_pg_name);
> +
> +                const struct sbrec_port_group *sb_pg =
> +
 sb_port_group_lookup_by_name(sbrec_port_group_by_name,
> +                                                 ds_cstr(&sb_pg_name));
> +                if (!sb_pg) {
> +                    success = false;

If sb_pg is not found, it can still be incrementally created, and there is
no need to recompute in this node.

> +                    break;
> +                }
> +                struct sorted_array nb_ports =
> +                    sorted_array_from_sset(&ls_pg_rec->ports);
> +                update_sb_port_group(&nb_ports, sb_pg);
> +                sorted_array_destroy(&nb_ports);
> +            }
> +        }
> +        ds_destroy(&sb_pg_name);

I expect this function to handle the deletion of the SB port_groups that
are no longer needed, but I didn't see such logic. Did I miss anything?

> +    }
> +
> +    data->ls_port_groups_sets_unchanged = success;
> +    engine_set_node_state(node, EN_UPDATED);
> +    hmapx_destroy(&updated_ls_port_groups);
> +    return success;
> +}
> +
> +static void
> +sb_port_group_apply_diff(const void *arg, const char *item, bool add)
> +{
> +    const struct sbrec_port_group *pg = arg;
> +    if (add) {
> +        sbrec_port_group_update_ports_addvalue(pg, item);
> +    } else {
> +        sbrec_port_group_update_ports_delvalue(pg, item);
> +    }
> +}
> +
> +static void
> +update_sb_port_group(struct sorted_array *nb_ports,
> +                     const struct sbrec_port_group *sb_pg)
> +{
> +    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
> +    sorted_array_apply_diff(nb_ports, &sb_ports,
> +                            sb_port_group_apply_diff, sb_pg);
> +    sorted_array_destroy(&sb_ports);
> +}
> +
> +static void
> +sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
> +                struct sorted_array *ports,
> +                struct shash *sb_port_groups)
> +{
> +    const struct sbrec_port_group *sb_port_group =
> +        shash_find_and_delete(sb_port_groups, sb_pg_name);
> +    if (!sb_port_group) {
> +        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> +        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
> +        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
> +    } else {
> +        update_sb_port_group(ports, sb_port_group);
> +    }
> +}
> +
> +/* Finds and returns the port group set with the given 'name', or NULL
> + * if no such port group exists. */
> +static const struct sbrec_port_group *
> +sb_port_group_lookup_by_name(struct ovsdb_idl_index
*sbrec_port_group_by_name,
> +                             const char *name)
> +{
> +    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
> +        sbrec_port_group_by_name);
> +    sbrec_port_group_index_set_name(target, name);
> +
> +    struct sbrec_port_group *retval = sbrec_port_group_index_find(
> +        sbrec_port_group_by_name, target);
> +
> +    sbrec_port_group_index_destroy_row(target);
> +    return retval;
> +}
> diff --git a/northd/en-port-group.h b/northd/en-port-group.h
> index 5cbf6c6c4a..c3975f64ee 100644
> --- a/northd/en-port-group.h
> +++ b/northd/en-port-group.h
> @@ -18,6 +18,7 @@
>
>  #include <stdint.h>
>
> +#include "lib/hmapx.h"
>  #include "lib/inc-proc-eng.h"
>  #include "lib/ovn-nb-idl.h"
>  #include "lib/ovn-sb-idl.h"
> @@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
>      const struct ls_port_group_table *,
>      const struct nbrec_logical_switch *);
>
> -void ls_port_group_table_build(struct ls_port_group_table
*ls_port_groups,
> -                               const struct nbrec_port_group_table *,
> -                               const struct hmap *ls_ports);
> +/* Per port group map of datapaths with ports in the group. */
> +struct port_group_to_ls_table {
> +    struct hmap entries; /* Stores struct port_group_to_ls. */
> +};
> +
> +struct port_group_to_ls {
> +    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
> +
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
> +    struct hmapx switches;
> +};
> +
> +void port_group_to_ls_table_init(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
> +
> +struct port_group_to_ls *port_group_to_ls_table_find(
> +    const struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +
> +void ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,

nit: The variable name for the same struct is port_groups_to_ls in the
struct port_group_data. Better to be consistent.

> +    const struct nbrec_port_group_table *,
> +    const struct hmap *ls_ports);
>  void ls_port_group_table_sync(const struct ls_port_group_table
*ls_port_groups,
>                                const struct sbrec_port_group_table *,
>                                struct ovsdb_idl_txn *ovnsb_txn);
> @@ -75,10 +100,15 @@ struct port_group_input {
>
>  struct port_group_data {
>      struct ls_port_group_table ls_port_groups;
> +    struct port_group_to_ls_table port_groups_to_ls;

nit: better to use consistent naming convention. For the struct name:
ls_port_group_table v.s. port_group_ls_table, or ls_to_port_group_table
v.s. port_group_to_ls_table.
And for the variable name: ls_port_groups v.s. port_group_lses

> +    bool ls_port_groups_sets_unchanged;
>  };
>
>  void *en_port_group_init(struct engine_node *, struct engine_arg *);
>  void en_port_group_cleanup(void *data);
> +void en_port_group_clear_tracked_data(void *data);
>  void en_port_group_run(struct engine_node *, void *data);
>
> +bool port_group_nb_port_group_handler(struct engine_node *, void *data);
> +
>  #endif /* EN_PORT_GROUP_H */
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index 6d5f9e8d16..bd598ba5e2 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker,
"mac_binding_aging_waker");
>  static ENGINE_NODE(northd_output, "northd_output");
>  static ENGINE_NODE(sync_to_sb, "sync_to_sb");
>  static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
> -static ENGINE_NODE(port_group, "port_group");
> +static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
>  static ENGINE_NODE(fdb_aging, "fdb_aging");
>  static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
>
> @@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
>      engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>      engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
> -    engine_add_input(&en_lflow, &en_port_group, NULL);
> +    engine_add_input(&en_lflow, &en_port_group,
lflow_port_group_handler);
>
>      engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
>                       sync_to_sb_addr_set_nb_address_set_handler);
> @@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>      engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
>
> -    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
> +    engine_add_input(&en_port_group, &en_nb_port_group,
> +                     port_group_nb_port_group_handler);
>      engine_add_input(&en_port_group, &en_sb_port_group, NULL);
>      /* No need for an explicit handler for northd changes.  Port changes
>       * that affect port_groups trigger updates to the NB.Port_Group
> @@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>                                  "sbrec_address_set_by_name",
>                                  sbrec_address_set_by_name);
>
> +    struct ovsdb_idl_index *sbrec_port_group_by_name
> +        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
> +    engine_ovsdb_node_add_index(&en_sb_port_group,
> +                                "sbrec_port_group_by_name",
> +                                sbrec_port_group_by_name);
> +
>      struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
>          = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
>                                    &sbrec_fdb_col_port_key);
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 4fa1b039ea..44385d604c 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -836,6 +836,10 @@ main(int argc, char *argv[])
>          ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>                               &sbrec_multicast_group_columns[i]);
>      }
> +    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
> +        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
> +                             &sbrec_port_group_columns[i]);
> +    }
>
>      unixctl_command_register("sb-connection-status", "", 0, 0,
>                               ovn_conn_show, ovnsb_idl_loop.idl);
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 1a12513d7a..a04ba2b23f 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE
inc-engine/show-stats sync_to_sb_a
>  AT_CLEANUP
>  ])
>
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([Port group incremental processing])
> +ovn_start
> +
> +check ovn-nbctl ls-add sw1 \
> +  -- lsp-add sw1 sw1.1     \
> +  -- lsp-add sw1 sw1.2     \
> +  -- lsp-add sw1 sw1.3     \
> +  -- ls-add sw2            \
> +  -- lsp-add sw2 sw2.1     \
> +  -- lsp-add sw2 sw2.2     \
> +  -- lsp-add sw2 sw2.3
> +
> +check ovn-nbctl --wait=sb sync
> +sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
> +sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
> +
> +check_acl_lflows() {
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$1
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$2
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$3
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$4
> +])
> +}
> +
> +AS_BOX([Create new PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes every time a NB port group is
added/deleted.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +

It is highly recommended to extend the _DUMP_DB_TABLES macro to include the
SB port_group table, and call the CHECK_NO_CHANGE_AFTER_RECOMPUTE macro
after every sub-test to verify the incremental processing is correct (i.e.
equivalent to the result of a recompute).

> +AS_BOX([Add ACLs on PG1 and PG2])
> +check ovn-nbctl --wait=sb             \
> +  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
> +  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
> +
> +AS_BOX([Add one port from the two switches to PG1])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect ACL1 on sw1 and sw2
> +check_acl_lflows 1 0 1 0
> +
> +AS_BOX([Add one port from the two switches to PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Add one more port from the two switches to PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb                     \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
> +check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the last port from PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the second port from PG1 and PG2])

The below case is testing when both ports of sw2 are removed from pg1 and
pg2, which surely should be tested, but we are missing the case when each
switch is removed from one of the PG but still contain the other PG, which
should also be tested.

Thanks,
Han

> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 \
> +  -- pg-set-ports pg2 sw1.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should
be
> +dnl a recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should
be
> +dnl a recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and not on sw2.
> +check_acl_lflows 1 1 0 0
> +
> +AT_CLEANUP
> +])
> +
>  OVN_FOR_EACH_NORTHD([
>  AT_SETUP([Check default drop])
>  AT_KEYWORDS([drop])
>
Dumitru Ceara Aug. 30, 2023, 10:39 a.m. UTC | #3
On 8/22/23 08:58, Ales Musil wrote:
> On Thu, Aug 10, 2023 at 2:45 PM Dumitru Ceara <dceara@redhat.com> wrote:
> 
>> It's similar to the processing we do for address sets.  There's a bit
>> more mechanics involved due to the fact that we need to split NB port
>> groups per datapath.
>>
>> We currently only partially implement incremental processing of
>> port_group changes in the lflow node.  That is, we deal with the case
>> when the sets of "switches per port group" doesn't change.  In that
>> specific case ACL lflows don't need to be reprocessed.
>>
>> In a synthetic benchmark that created (in this order):
>> - 500 switches
>> - 2000 port groups
>> - 4 ACLs per port group
>> - 10000 ports distributed equally between the switches and port groups
>>
>> we measured the following ovn-northd CPU usage:
>>
>>   +-------------------------+------------+--------------------+
>>   | Incremental processing? | --wait=sb? | northd avg cpu (%) |
>>   +-------------------------+------------+--------------------+
>>   |           N             |     Y      |        84.2        |
>>   +-------------------------+------------+--------------------+
>>   |           Y             |     Y      |        41.5        |
>>   +-------------------------+------------+--------------------+
>>   |           N             |     N      |        93.2        |
>>   +-------------------------+------------+--------------------+
>>   |           Y             |     N      |        53.6        |
>>   +-------------------------+------------+--------------------+
>>
>> where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
>> port and port group operations to be propagated to the Southbound DB
>> before continuing to the next operation.
>>
>> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
>> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
>> ---
>>
> 
> Hi Dumitru,
> 
> I have a couple of comments down below.
> 

Hi Ales,

Thanks for the review!

[..]

>>  static struct ls_port_group_record *
>>  ls_port_group_record_add(struct ls_port_group *ls_pg,
>>                           const struct nbrec_port_group *nb_pg,
>>                           const char *port_name)
>>  {
>> -    struct ls_port_group_record *ls_pg_rec = NULL;
>> +    struct ls_port_group_record *ls_pg_rec =
>> +        ls_port_group_record_find(ls_pg, nb_pg);
>>      size_t hash = uuid_hash(&nb_pg->header_.uuid);
>>
>> -    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
>> -        if (ls_pg_rec->nb_pg == nb_pg) {
>> -            goto done;
>> -        }
>> +    if (!ls_pg_rec) {
>> +        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>>
> 
> nit: No need for zeroed alloc as all the fields are immediately overwritten.
> 

Fair, I'll change it to xmalloc().

> +        *ls_pg_rec = (struct ls_port_group_record) {
>> +            .nb_pg = nb_pg,
>> +            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
>> +        };
>> +        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>>      }
>>
>> -    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>> -    *ls_pg_rec = (struct ls_port_group_record) {
>> -        .nb_pg = nb_pg,
>> -        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
>> -    };
>> -    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>> -done:
>>      sset_add(&ls_pg_rec->ports, port_name);
>>      return ls_pg_rec;
>>  }
>>
>> +static struct ls_port_group_record *
>> +ls_port_group_record_find(struct ls_port_group *ls_pg,
>> +                          const struct nbrec_port_group *nb_pg)
>> +{
>> +    size_t hash = uuid_hash(&nb_pg->header_.uuid);
>> +    struct ls_port_group_record *ls_pg_rec;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
>> +        if (ls_pg_rec->nb_pg == nb_pg) {
>> +            return ls_pg_rec;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +
>>  static void
>>  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
>>                               struct ls_port_group_record *ls_pg_rec)
>> @@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group
>> *ls_pg,
>>      }
>>  }
>>
>> +void
>> +port_group_to_ls_table_init(struct port_group_to_ls_table *table)
>> +{
>> +    *table = (struct port_group_to_ls_table) {
>> +        .entries = HMAP_INITIALIZER(&table->entries),
>> +    };
>> +}
>> +
>> +void
>> +port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
>> +{
>> +    struct port_group_to_ls *pg_ls;
>> +    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
>> +        port_group_to_ls_destroy(table, pg_ls);
>> +    }
>> +}
>> +
>> +void
>> +port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
>> +{
>> +    port_group_to_ls_table_clear(table);
>> +    hmap_destroy(&table->entries);
>> +}
>> +
>> +struct port_group_to_ls *
>> +port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
>> +                            const struct nbrec_port_group *nb_pg)
>> +{
>> +    struct port_group_to_ls *pg_ls;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node,
>> uuid_hash(&nb_pg->header_.uuid),
>> +                             &table->entries) {
>>
> 
> We should move the uuid_hash call outside the loop.
> 
> 

As Han pointed out, that's not really needed.  I'll leave it as is.

Thanks,
Dumitru

>> +        if (nb_pg == pg_ls->nb_pg) {
>> +            return pg_ls;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +static struct port_group_to_ls *
>> +port_group_to_ls_create(struct port_group_to_ls_table *table,
>> +                        const struct nbrec_port_group *nb_pg)
>> +{
>> +    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
>> +
>> +    *pg_ls = (struct port_group_to_ls) {
>> +        .nb_pg = nb_pg,
>> +        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
>> +    };
>> +    hmap_insert(&table->entries, &pg_ls->key_node,
>> +                uuid_hash(&nb_pg->header_.uuid));
>> +    return pg_ls;
>> +}
>> +
>> +static void
>> +port_group_to_ls_destroy(struct port_group_to_ls_table *table,
>> +                         struct port_group_to_ls *pg_ls)
>> +{
>> +    if (pg_ls) {
>> +        hmapx_destroy(&pg_ls->switches);
>> +        hmap_remove(&table->entries, &pg_ls->key_node);
>> +        free(pg_ls);
>> +    }
>> +}
>> +
>>  /* Incremental processing implementation. */
>>  static struct port_group_input
>>  port_group_get_input_data(struct engine_node *node)
>> @@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node OVS_UNUSED,
>>      struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
>>
>>      ls_port_group_table_init(&pg_data->ls_port_groups);
>> +    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
>>      return pg_data;
>>  }
>>
>> @@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
>>      struct port_group_data *data = data_;
>>
>>      ls_port_group_table_destroy(&data->ls_port_groups);
>> +    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
>> +}
>> +
>> +void
>> +en_port_group_clear_tracked_data(void *data_)
>> +{
>> +    struct port_group_data *data = data_;
>> +
>> +    data->ls_port_groups_sets_unchanged = false;
>>  }
>>
>>  void
>> @@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void
>> *data_)
>>      stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>>
>>      ls_port_group_table_clear(&data->ls_port_groups);
>> +    port_group_to_ls_table_clear(&data->port_groups_to_ls);
>> +
>>      ls_port_group_table_build(&data->ls_port_groups,
>> +                              &data->port_groups_to_ls,
>>                                input_data.nbrec_port_group_table,
>>                                input_data.ls_ports);
>>
>> @@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void
>> *data_)
>>      stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>>      engine_set_node_state(node, EN_UPDATED);
>>  }
>> +
>> +bool
>> +port_group_nb_port_group_handler(struct engine_node *node, void *data_)
>> +{
>> +    struct port_group_input input_data = port_group_get_input_data(node);
>> +    struct port_group_data *data = data_;
>> +    bool success = true;
>> +
>> +    const struct nbrec_port_group_table *nb_pg_table =
>> +        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
>> +    const struct nbrec_port_group *nb_pg;
>> +
>> +    /* Return false if a port group is created or deleted.
>> +     * Handle I-P for only updated port groups. */
>> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
>> +        if (nbrec_port_group_is_new(nb_pg) ||
>> +                nbrec_port_group_is_deleted(nb_pg)) {
>> +            return false;
>> +        }
>> +    }
>> +
>> +    struct hmapx updated_ls_port_groups =
>> +        HMAPX_INITIALIZER(&updated_ls_port_groups);
>> +
>> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
>> +        /* Newly created port groups can't be incrementally processed;
>> +         * the rest yes. */
>> +        if (ls_port_group_process(&data->ls_port_groups,
>> +                                  &data->port_groups_to_ls,
>> +                                  input_data.ls_ports,
>> +                                  nb_pg, &updated_ls_port_groups)) {
>> +            success = false;
>> +            break;
>> +        }
>> +    }
>> +
>> +    /* If changes have been successfully processed incrementally then
>> update
>> +     * the SB too. */
>> +    if (success) {
>> +        struct ovsdb_idl_index *sbrec_port_group_by_name =
>> +            engine_ovsdb_node_get_index(
>> +                    engine_get_input("SB_port_group", node),
>> +                    "sbrec_port_group_by_name");
>> +        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
>> +
>> +        struct hmapx_node *updated_node;
>> +        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
>> +            const struct ls_port_group *ls_pg = updated_node->data;
>> +            struct ls_port_group_record *ls_pg_rec;
>> +
>> +            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
>> +                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>> +                                        ls_pg->sb_datapath_key,
>> +                                        &sb_pg_name);
>> +
>> +                const struct sbrec_port_group *sb_pg =
>> +                    sb_port_group_lookup_by_name(sbrec_port_group_by_name,
>> +                                                 ds_cstr(&sb_pg_name));
>> +                if (!sb_pg) {
>> +                    success = false;
>> +                    break;
>> +                }
>> +                struct sorted_array nb_ports =
>> +                    sorted_array_from_sset(&ls_pg_rec->ports);
>> +                update_sb_port_group(&nb_ports, sb_pg);
>> +                sorted_array_destroy(&nb_ports);
>> +            }
>> +        }
>> +        ds_destroy(&sb_pg_name);
>> +    }
>> +
>> +    data->ls_port_groups_sets_unchanged = success;
>> +    engine_set_node_state(node, EN_UPDATED);
>> +    hmapx_destroy(&updated_ls_port_groups);
>> +    return success;
>> +}
>> +
>> +static void
>> +sb_port_group_apply_diff(const void *arg, const char *item, bool add)
>> +{
>> +    const struct sbrec_port_group *pg = arg;
>> +    if (add) {
>> +        sbrec_port_group_update_ports_addvalue(pg, item);
>> +    } else {
>> +        sbrec_port_group_update_ports_delvalue(pg, item);
>> +    }
>> +}
>> +
>> +static void
>> +update_sb_port_group(struct sorted_array *nb_ports,
>> +                     const struct sbrec_port_group *sb_pg)
>> +{
>> +    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
>> +    sorted_array_apply_diff(nb_ports, &sb_ports,
>> +                            sb_port_group_apply_diff, sb_pg);
>> +    sorted_array_destroy(&sb_ports);
>> +}
>> +
>> +static void
>> +sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
>> +                struct sorted_array *ports,
>> +                struct shash *sb_port_groups)
>> +{
>> +    const struct sbrec_port_group *sb_port_group =
>> +        shash_find_and_delete(sb_port_groups, sb_pg_name);
>> +    if (!sb_port_group) {
>> +        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
>> +        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
>> +        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
>> +    } else {
>> +        update_sb_port_group(ports, sb_port_group);
>> +    }
>> +}
>> +
>> +/* Finds and returns the port group set with the given 'name', or NULL
>> + * if no such port group exists. */
>> +static const struct sbrec_port_group *
>> +sb_port_group_lookup_by_name(struct ovsdb_idl_index
>> *sbrec_port_group_by_name,
>> +                             const char *name)
>> +{
>> +    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
>> +        sbrec_port_group_by_name);
>> +    sbrec_port_group_index_set_name(target, name);
>> +
>> +    struct sbrec_port_group *retval = sbrec_port_group_index_find(
>> +        sbrec_port_group_by_name, target);
>> +
>> +    sbrec_port_group_index_destroy_row(target);
>> +    return retval;
>> +}
>> diff --git a/northd/en-port-group.h b/northd/en-port-group.h
>> index 5cbf6c6c4a..c3975f64ee 100644
>> --- a/northd/en-port-group.h
>> +++ b/northd/en-port-group.h
>> @@ -18,6 +18,7 @@
>>
>>  #include <stdint.h>
>>
>> +#include "lib/hmapx.h"
>>  #include "lib/inc-proc-eng.h"
>>  #include "lib/ovn-nb-idl.h"
>>  #include "lib/ovn-sb-idl.h"
>> @@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
>>      const struct ls_port_group_table *,
>>      const struct nbrec_logical_switch *);
>>
>> -void ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
>> -                               const struct nbrec_port_group_table *,
>> -                               const struct hmap *ls_ports);
>> +/* Per port group map of datapaths with ports in the group. */
>> +struct port_group_to_ls_table {
>> +    struct hmap entries; /* Stores struct port_group_to_ls. */
>> +};
>> +
>> +struct port_group_to_ls {
>> +    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
>> +
>> +    const struct nbrec_port_group *nb_pg;
>> +
>> +    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
>> +    struct hmapx switches;
>> +};
>> +
>> +void port_group_to_ls_table_init(struct port_group_to_ls_table *);
>> +void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
>> +void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
>> +
>> +struct port_group_to_ls *port_group_to_ls_table_find(
>> +    const struct port_group_to_ls_table *,
>> +    const struct nbrec_port_group *);
>> +
>> +void ls_port_group_table_build(
>> +    struct ls_port_group_table *ls_port_groups,
>> +    struct port_group_to_ls_table *port_group_to_switches,
>> +    const struct nbrec_port_group_table *,
>> +    const struct hmap *ls_ports);
>>  void ls_port_group_table_sync(const struct ls_port_group_table
>> *ls_port_groups,
>>                                const struct sbrec_port_group_table *,
>>                                struct ovsdb_idl_txn *ovnsb_txn);
>> @@ -75,10 +100,15 @@ struct port_group_input {
>>
>>  struct port_group_data {
>>      struct ls_port_group_table ls_port_groups;
>> +    struct port_group_to_ls_table port_groups_to_ls;
>> +    bool ls_port_groups_sets_unchanged;
>>  };
>>
>>  void *en_port_group_init(struct engine_node *, struct engine_arg *);
>>  void en_port_group_cleanup(void *data);
>> +void en_port_group_clear_tracked_data(void *data);
>>  void en_port_group_run(struct engine_node *, void *data);
>>
>> +bool port_group_nb_port_group_handler(struct engine_node *, void *data);
>> +
>>  #endif /* EN_PORT_GROUP_H */
>> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
>> index 6d5f9e8d16..bd598ba5e2 100644
>> --- a/northd/inc-proc-northd.c
>> +++ b/northd/inc-proc-northd.c
>> @@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker,
>> "mac_binding_aging_waker");
>>  static ENGINE_NODE(northd_output, "northd_output");
>>  static ENGINE_NODE(sync_to_sb, "sync_to_sb");
>>  static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
>> -static ENGINE_NODE(port_group, "port_group");
>> +static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
>>  static ENGINE_NODE(fdb_aging, "fdb_aging");
>>  static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
>>
>> @@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>      engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
>>      engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>>      engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
>> -    engine_add_input(&en_lflow, &en_port_group, NULL);
>> +    engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
>>
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
>>                       sync_to_sb_addr_set_nb_address_set_handler);
>> @@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
>>
>> -    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
>> +    engine_add_input(&en_port_group, &en_nb_port_group,
>> +                     port_group_nb_port_group_handler);
>>      engine_add_input(&en_port_group, &en_sb_port_group, NULL);
>>      /* No need for an explicit handler for northd changes.  Port changes
>>       * that affect port_groups trigger updates to the NB.Port_Group
>> @@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>                                  "sbrec_address_set_by_name",
>>                                  sbrec_address_set_by_name);
>>
>> +    struct ovsdb_idl_index *sbrec_port_group_by_name
>> +        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
>> +    engine_ovsdb_node_add_index(&en_sb_port_group,
>> +                                "sbrec_port_group_by_name",
>> +                                sbrec_port_group_by_name);
>> +
>>      struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
>>          = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
>>                                    &sbrec_fdb_col_port_key);
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 4fa1b039ea..44385d604c 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -836,6 +836,10 @@ main(int argc, char *argv[])
>>          ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>>                               &sbrec_multicast_group_columns[i]);
>>      }
>> +    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
>> +        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>> +                             &sbrec_port_group_columns[i]);
>> +    }
>>
>>      unixctl_command_register("sb-connection-status", "", 0, 0,
>>                               ovn_conn_show, ovnsb_idl_loop.idl);
>> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
>> index 1a12513d7a..a04ba2b23f 100644
>> --- a/tests/ovn-northd.at
>> +++ b/tests/ovn-northd.at
>> @@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE
>> inc-engine/show-stats sync_to_sb_a
>>  AT_CLEANUP
>>  ])
>>
>> +OVN_FOR_EACH_NORTHD_NO_HV([
>> +AT_SETUP([Port group incremental processing])
>> +ovn_start
>> +
>> +check ovn-nbctl ls-add sw1 \
>> +  -- lsp-add sw1 sw1.1     \
>> +  -- lsp-add sw1 sw1.2     \
>> +  -- lsp-add sw1 sw1.3     \
>> +  -- ls-add sw2            \
>> +  -- lsp-add sw2 sw2.1     \
>> +  -- lsp-add sw2 sw2.2     \
>> +  -- lsp-add sw2 sw2.3
>> +
>> +check ovn-nbctl --wait=sb sync
>> +sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
>> +sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
>> +
>> +check_acl_lflows() {
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
>> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
>> +$1
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
>> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
>> +$2
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
>> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
>> +$3
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
>> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
>> +$4
>> +])
>> +}
>> +
>> +AS_BOX([Create new PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes every time a NB port group is
>> added/deleted.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +
>> +AS_BOX([Add ACLs on PG1 and PG2])
>> +check ovn-nbctl --wait=sb             \
>> +  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
>> +  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
>> +
>> +AS_BOX([Add one port from the two switches to PG1])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes also every time a port from a new
>> switch
>> +dnl is added to the group.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect ACL1 on sw1 and sw2
>> +check_acl_lflows 1 0 1 0
>> +
>> +AS_BOX([Add one port from the two switches to PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes also every time a port from a new
>> switch
>> +dnl is added to the group.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute (for ACLs).
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Add one more port from the two switches to PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb                     \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
>> +check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
>> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
>> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Remove the last port from PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1 \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
>> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
>> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Remove the second port from PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 \
>> +  -- pg-set-ports pg2 sw1.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
>> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did changed the set of switches a pg is applied to, there should be
>> +dnl a recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl We did changed the set of switches a pg is applied to, there should be
>> +dnl a recompute (for ACLs).
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
>> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and not on sw2.
>> +check_acl_lflows 1 1 0 0
>> +
>> +AT_CLEANUP
>> +])
>> +
>>  OVN_FOR_EACH_NORTHD([
>>  AT_SETUP([Check default drop])
>>  AT_KEYWORDS([drop])
>>
>>
> With that addressed:
> 
> Acked-by: Ales Musil <amusil@redhat.com>
Dumitru Ceara Aug. 30, 2023, 10:40 a.m. UTC | #4
On 8/24/23 08:18, Han Zhou wrote:
> On Thu, Aug 10, 2023 at 5:45 AM Dumitru Ceara <dceara@redhat.com> wrote:
>>
>> It's similar to the processing we do for address sets.  There's a bit
>> more mechanics involved due to the fact that we need to split NB port
>> groups per datapath.
>>
>> We currently only partially implement incremental processing of
>> port_group changes in the lflow node.  That is, we deal with the case
>> when the sets of "switches per port group" doesn't change.  In that
>> specific case ACL lflows don't need to be reprocessed.
>>
>> In a synthetic benchmark that created (in this order):
>> - 500 switches
>> - 2000 port groups
>> - 4 ACLs per port group
>> - 10000 ports distributed equally between the switches and port groups
>>
>> we measured the following ovn-northd CPU usage:
>>
>>   +-------------------------+------------+--------------------+
>>   | Incremental processing? | --wait=sb? | northd avg cpu (%) |
>>   +-------------------------+------------+--------------------+
>>   |           N             |     Y      |        84.2        |
>>   +-------------------------+------------+--------------------+
>>   |           Y             |     Y      |        41.5        |
>>   +-------------------------+------------+--------------------+
>>   |           N             |     N      |        93.2        |
>>   +-------------------------+------------+--------------------+
>>   |           Y             |     N      |        53.6        |
>>   +-------------------------+------------+--------------------+
>>
>> where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
>> port and port group operations to be propagated to the Southbound DB
>> before continuing to the next operation.
>>
>> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
>> Signed-off-by: Dumitru Ceara <dceara@redhat.com>
> 
> Thanks Dumitru for the improvement! Please see my comments below.
> 

Hi Han,

Thanks for the review!

>> ---
>>  northd/en-lflow.c        |   17 ++
>>  northd/en-lflow.h        |    1
>>  northd/en-port-group.c   |  451
> ++++++++++++++++++++++++++++++++++++++++------
>>  northd/en-port-group.h   |   36 +++-
>>  northd/inc-proc-northd.c |   13 +
>>  northd/ovn-northd.c      |    4
>>  tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
>>  7 files changed, 708 insertions(+), 60 deletions(-)
>>
>> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
>> index 7f6a7872b2..1321f79036 100644
>> --- a/northd/en-lflow.c
>> +++ b/northd/en-lflow.c
>> @@ -119,6 +119,23 @@ lflow_northd_handler(struct engine_node *node,
>>      return true;
>>  }
>>
>> +bool
>> +lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
>> +{
>> +    struct port_group_data *pg_data =
>> +        engine_get_input_data("port_group", node);
>> +
>> +    /* If the set of switches per port group didn't change then there's
> no
>> +     * need to reprocess lflows.  Otherwise, there might be a need to add
>> +     * port-group ACLs to new switches. */
> 
> To be more accurate, the comment should say "there might be a need to
> add/delete port-group ACLs to/from switches."
> 

Ack.

>> +    if (!pg_data->ls_port_groups_sets_unchanged) {
> 
> nit: it would be a little more natural to name the field as
> ls_port_groups_sets_changed and use positive check in the if condition.
> 

Sure, I changed it like that.

>> +        return false;
>> +    }
>> +
>> +    engine_set_node_state(node, EN_UPDATED);
>> +    return true;
>> +}
>> +
>>  void *en_lflow_init(struct engine_node *node OVS_UNUSED,
>>                       struct engine_arg *arg OVS_UNUSED)
>>  {
>> diff --git a/northd/en-lflow.h b/northd/en-lflow.h
>> index 5e3fbc25e3..5417b2faff 100644
>> --- a/northd/en-lflow.h
>> +++ b/northd/en-lflow.h
>> @@ -13,5 +13,6 @@ void en_lflow_run(struct engine_node *node, void *data);
>>  void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
>>  void en_lflow_cleanup(void *data);
>>  bool lflow_northd_handler(struct engine_node *, void *data);
>> +bool lflow_port_group_handler(struct engine_node *, void *data);
>>
>>  #endif /* EN_LFLOW_H */
>> diff --git a/northd/en-port-group.c b/northd/en-port-group.c
>> index 2c36410246..6902695a01 100644
>> --- a/northd/en-port-group.c
>> +++ b/northd/en-port-group.c
>> @@ -33,15 +33,46 @@ static struct ls_port_group *ls_port_group_create(
>>  static void ls_port_group_destroy(struct ls_port_group_table *,
>>                                    struct ls_port_group *);
>>
>> +static bool ls_port_group_process(
>> +    struct ls_port_group_table *,
>> +    struct port_group_to_ls_table *,
>> +    const struct hmap *ls_ports,
>> +    const struct nbrec_port_group *,
>> +    struct hmapx *updated_ls_port_groups
>> +);
> 
> nit: It would be better to use the same coding style as the below
> prototype, i.e. don't put the last ");" in a separate line.
> 

OK.

>> +
>> +static void ls_port_group_record_clear(
>> +    struct ls_port_group_table *,
>> +    struct port_group_to_ls *,
>> +    struct hmapx *updated_ls_port_groups);
>> +static void ls_port_group_record_prune(struct ls_port_group *);
>> +
>>  static struct ls_port_group_record *ls_port_group_record_add(
>>      struct ls_port_group *,
>>      const struct nbrec_port_group *,
>>      const char *port_name);
>>
>> +static struct ls_port_group_record *ls_port_group_record_find(
>> +    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
>> +
>>  static void ls_port_group_record_destroy(
>>      struct ls_port_group *,
>>      struct ls_port_group_record *);
>>
>> +static struct port_group_to_ls *port_group_to_ls_create(
>> +    struct port_group_to_ls_table *,
>> +    const struct nbrec_port_group *);
>> +static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
>> +                                     struct port_group_to_ls *);
>> +
>> +static void update_sb_port_group(struct sorted_array *nb_ports,
>> +                                 const struct sbrec_port_group *sb_pg);
>> +static void sync_port_group(struct ovsdb_idl_txn *, const char
> *sb_pg_name,
>> +                            struct sorted_array *ports,
>> +                            struct shash *sb_port_groups);
>> +static const struct sbrec_port_group *sb_port_group_lookup_by_name(
>> +    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
>> +
>>  void
>>  ls_port_group_table_init(struct ls_port_group_table *table)
>>  {
>> @@ -82,39 +113,16 @@ ls_port_group_table_find(const struct
> ls_port_group_table *table,
>>  }
>>
>>  void
>> -ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
>> -                          const struct nbrec_port_group_table *pg_table,
>> -                          const struct hmap *ls_ports)
>> +ls_port_group_table_build(
>> +    struct ls_port_group_table *ls_port_groups,
>> +    struct port_group_to_ls_table *port_group_to_switches,
>> +    const struct nbrec_port_group_table *pg_table,
>> +    const struct hmap *ls_ports)
>>  {
>>      const struct nbrec_port_group *nb_pg;
>>      NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
>> -        for (size_t i = 0; i < nb_pg->n_ports; i++) {
>> -            const char *port_name = nb_pg->ports[i]->name;
>> -            const struct ovn_datapath *od =
>> -                northd_get_datapath_for_port(ls_ports, port_name);
>> -
>> -            if (!od) {
>> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
>> -                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
>> -                            port_name, nb_pg->name);
>> -                continue;
>> -            }
>> -
>> -            if (!od->nbs) {
>> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
>> -                VLOG_WARN_RL(&rl, "lport %s in port group %s has no
> lswitch.",
>> -                             nb_pg->ports[i]->name,
>> -                             nb_pg->name);
>> -                continue;
>> -            }
>> -
>> -            struct ls_port_group *ls_pg =
>> -                ls_port_group_table_find(ls_port_groups, od->nbs);
>> -            if (!ls_pg) {
>> -                ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
> od->sb);
>> -            }
>> -            ls_port_group_record_add(ls_pg, nb_pg, port_name);
>> -        }
>> +        ls_port_group_process(ls_port_groups, port_group_to_switches,
>> +                              ls_ports, nb_pg, NULL);
>>      }
>>  }
>>
>> @@ -145,18 +153,11 @@ ls_port_group_table_sync(
>>              get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>>                                     ls_pg->sb_datapath_key,
>>                                     &sb_name);
>> -            sb_port_group = shash_find_and_delete(&sb_port_groups,
>> -                                                  ds_cstr(&sb_name));
>> -            if (!sb_port_group) {
>> -                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
>> -                sbrec_port_group_set_name(sb_port_group,
> ds_cstr(&sb_name));
>> -            }
>> -
>> -            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
>> -            sbrec_port_group_set_ports(sb_port_group,
>> -                                       nb_port_names,
>> -                                       sset_count(&ls_pg_rec->ports));
>> -            free(nb_port_names);
>> +            struct sorted_array ports =
>> +                sorted_array_from_sset(&ls_pg_rec->ports);
>> +            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
>> +                            &ports, &sb_port_groups);
>> +            sorted_array_destroy(&ports);
>>          }
>>      }
>>      ds_destroy(&sb_name);
>> @@ -201,31 +202,165 @@ ls_port_group_destroy(struct ls_port_group_table
> *ls_port_groups,
>>      }
>>  }
>>
>> +/* Process a NB.Port_Group record and stores any updated ls_port_groups
>> + * in updated_ls_port_groups.  Returns true if a new ls_port_group had
>> + * to be created or destroyed.
>> + */
>> +static bool
>> +ls_port_group_process(struct ls_port_group_table *ls_port_groups,
>> +                      struct port_group_to_ls_table
> *port_group_to_switches,
>> +                      const struct hmap *ls_ports,
>> +                      const struct nbrec_port_group *nb_pg,
>> +                      struct hmapx *updated_ls_port_groups)
>> +{
>> +    struct hmapx cleared_ls_port_groups =
>> +        HMAPX_INITIALIZER(&cleared_ls_port_groups);
>> +    bool ls_port_group_created = false;
>> +
>> +    struct port_group_to_ls *pg_ls =
>> +        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
>> +    if (!pg_ls) {
>> +        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
>> +    } else {
>> +        /* Clear all old records corresponding to this port group; we'll
>> +         * reprocess it below. */
>> +        ls_port_group_record_clear(ls_port_groups, pg_ls,
>> +                                   &cleared_ls_port_groups);
> 
> When the last port from a LS is removed from a PG, we should remove the LS
> from the port_group_to_switches record's "switches" field. I didn't find
> this logic in the code. Did I miss anything?
> 

You're right, this is a bug.  The idea was to return false from the I-P
handler if the set of SB per-ls records would change.  I currently only
do that if all the PGs associated to a LS are removed.  I'll fix it and
improve the test.

For the "add" case, that's taken care of by the fact that we'll be
missing SB.Port_Group records.

>> +    }
>> +
>> +    for (size_t i = 0; i < nb_pg->n_ports; i++) {
>> +        const char *port_name = nb_pg->ports[i]->name;
>> +        const struct ovn_datapath *od =
>> +            northd_get_datapath_for_port(ls_ports, port_name);
>> +
>> +        if (!od) {
>> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1,
> 1);
>> +            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
>> +                        port_name, nb_pg->name);
>> +            continue;
>> +        }
>> +
>> +        if (!od->nbs) {
>> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1,
> 1);
>> +            VLOG_WARN_RL(&rl, "lport %s in port group %s has no
> lswitch.",
>> +                         nb_pg->ports[i]->name,
>> +                         nb_pg->name);
>> +            continue;
>> +        }
>> +
>> +        struct ls_port_group *ls_pg =
>> +            ls_port_group_table_find(ls_port_groups, od->nbs);
>> +        if (!ls_pg) {
>> +            ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
> od->sb);
>> +            ls_port_group_created = true;
> 
> What if a ls_pg has existed, and now a new PG is associated with the LS? I
> think we should identify such cases, too, and return false.
> 

We currently deal with this when syncing the SB port groups.  The PG
won't be found then and we'll return false from the handler therefore
trigerring a recompute.

>> +        }
>> +        ls_port_group_record_add(ls_pg, nb_pg, port_name);
>> +        hmapx_add(&pg_ls->switches,
>> +                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
>> +        if (updated_ls_port_groups) {
>> +            hmapx_add(updated_ls_port_groups, ls_pg);
>> +        }
>> +    }
>> +
>> +    bool ls_port_group_destroyed = false;
>> +    struct hmapx_node *node;
>> +    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
>> +        struct ls_port_group *ls_pg = node->data;
>> +
>> +        ls_port_group_record_prune(ls_pg);
>> +
>> +        if (hmap_is_empty(&ls_pg->nb_pgs)) {
>> +            ls_port_group_destroy(ls_port_groups, ls_pg);
>> +            ls_port_group_destroyed = true;
> 
> Similar to the above, what if a PG is removed from a ls_pg but the ls_pg
> still has some other PGs? Should we return false, too?
> 

Right, I'll fix this in v2.

>> +        }
>> +    }
>> +    hmapx_destroy(&cleared_ls_port_groups);
>> +
>> +    return ls_port_group_created || ls_port_group_destroyed;
>> +}
>> +
>> +/* Destroys all the struct ls_port_group_record that might be associated
> to
>> + * northbound database logical switches.  Stores ls_port_groups that
> became
>> + * were updated in the 'updated_ls_port_groups' map.
> 
> nit:
> s/became were/were
> s/updated/cleared
> 

Ack.

>> + */
>> +static void
>> +ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
>> +                           struct port_group_to_ls *pg_ls,
>> +                           struct hmapx *cleared_ls_port_groups)
>> +{
>> +    struct hmapx_node *node;
>> +
>> +    HMAPX_FOR_EACH (node, &pg_ls->switches) {
>> +        const struct nbrec_logical_switch *nbs = node->data;
>> +
>> +        struct ls_port_group *ls_pg =
>> +            ls_port_group_table_find(ls_port_groups, nbs);
>> +        if (!ls_pg) {
> 
> Should it be a bug if the ls_pg is not found here? Shall we assert?
> 

I think you're right, I'll change this into an assert.

>> +            continue;
>> +        }
>> +
>> +        /* Clear ports in the port group record. */
>> +        struct ls_port_group_record *ls_pg_rec =
>> +            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
>> +        if (!ls_pg_rec) {
> 
> Same as above.
> 

Ack.

>> +            continue;
>> +        }
>> +
>> +        sset_clear(&ls_pg_rec->ports);
>> +        hmapx_add(cleared_ls_port_groups, ls_pg);
>> +    }
>> +}
>> +
>> +static void
>> +ls_port_group_record_prune(struct ls_port_group *ls_pg)
>> +{
>> +    struct ls_port_group_record *ls_pg_rec;
>> +
>> +    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
>> +        if (sset_is_empty(&ls_pg_rec->ports)) {
>> +            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
>> +        }
>> +    }
>> +}
>> +
>>  static struct ls_port_group_record *
>>  ls_port_group_record_add(struct ls_port_group *ls_pg,
>>                           const struct nbrec_port_group *nb_pg,
>>                           const char *port_name)
>>  {
>> -    struct ls_port_group_record *ls_pg_rec = NULL;
>> +    struct ls_port_group_record *ls_pg_rec =
>> +        ls_port_group_record_find(ls_pg, nb_pg);
>>      size_t hash = uuid_hash(&nb_pg->header_.uuid);
>>
>> -    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
>> -        if (ls_pg_rec->nb_pg == nb_pg) {
>> -            goto done;
>> -        }
>> +    if (!ls_pg_rec) {
>> +        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>> +        *ls_pg_rec = (struct ls_port_group_record) {
>> +            .nb_pg = nb_pg,
>> +            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
>> +        };
>> +        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>>      }
>>
>> -    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>> -    *ls_pg_rec = (struct ls_port_group_record) {
>> -        .nb_pg = nb_pg,
>> -        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
>> -    };
>> -    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>> -done:
>>      sset_add(&ls_pg_rec->ports, port_name);
>>      return ls_pg_rec;
>>  }
>>
>> +static struct ls_port_group_record *
>> +ls_port_group_record_find(struct ls_port_group *ls_pg,
>> +                          const struct nbrec_port_group *nb_pg)
>> +{
>> +    size_t hash = uuid_hash(&nb_pg->header_.uuid);
>> +    struct ls_port_group_record *ls_pg_rec;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
>> +        if (ls_pg_rec->nb_pg == nb_pg) {
>> +            return ls_pg_rec;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +
>>  static void
>>  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
>>                               struct ls_port_group_record *ls_pg_rec)
>> @@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group
> *ls_pg,
>>      }
>>  }
>>
>> +void
>> +port_group_to_ls_table_init(struct port_group_to_ls_table *table)
>> +{
>> +    *table = (struct port_group_to_ls_table) {
>> +        .entries = HMAP_INITIALIZER(&table->entries),
>> +    };
>> +}
>> +
>> +void
>> +port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
>> +{
>> +    struct port_group_to_ls *pg_ls;
>> +    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
>> +        port_group_to_ls_destroy(table, pg_ls);
>> +    }
>> +}
>> +
>> +void
>> +port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
>> +{
>> +    port_group_to_ls_table_clear(table);
>> +    hmap_destroy(&table->entries);
>> +}
>> +
>> +struct port_group_to_ls *
>> +port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
>> +                            const struct nbrec_port_group *nb_pg)
>> +{
>> +    struct port_group_to_ls *pg_ls;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node,
> uuid_hash(&nb_pg->header_.uuid),
>> +                             &table->entries) {
>> +        if (nb_pg == pg_ls->nb_pg) {
>> +            return pg_ls;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +static struct port_group_to_ls *
>> +port_group_to_ls_create(struct port_group_to_ls_table *table,
>> +                        const struct nbrec_port_group *nb_pg)
>> +{
>> +    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
>> +
>> +    *pg_ls = (struct port_group_to_ls) {
>> +        .nb_pg = nb_pg,
>> +        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
>> +    };
>> +    hmap_insert(&table->entries, &pg_ls->key_node,
>> +                uuid_hash(&nb_pg->header_.uuid));
>> +    return pg_ls;
>> +}
>> +
>> +static void
>> +port_group_to_ls_destroy(struct port_group_to_ls_table *table,
>> +                         struct port_group_to_ls *pg_ls)
>> +{
>> +    if (pg_ls) {
>> +        hmapx_destroy(&pg_ls->switches);
>> +        hmap_remove(&table->entries, &pg_ls->key_node);
>> +        free(pg_ls);
>> +    }
>> +}
>> +
>>  /* Incremental processing implementation. */
>>  static struct port_group_input
>>  port_group_get_input_data(struct engine_node *node)
>> @@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node
> OVS_UNUSED,
>>      struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
>>
>>      ls_port_group_table_init(&pg_data->ls_port_groups);
>> +    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
>>      return pg_data;
>>  }
>>
>> @@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
>>      struct port_group_data *data = data_;
>>
>>      ls_port_group_table_destroy(&data->ls_port_groups);
>> +    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
>> +}
>> +
>> +void
>> +en_port_group_clear_tracked_data(void *data_)
>> +{
>> +    struct port_group_data *data = data_;
>> +
>> +    data->ls_port_groups_sets_unchanged = false;
>>  }
>>
>>  void
>> @@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>>      stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>>
>>      ls_port_group_table_clear(&data->ls_port_groups);
>> +    port_group_to_ls_table_clear(&data->port_groups_to_ls);
>> +
>>      ls_port_group_table_build(&data->ls_port_groups,
>> +                              &data->port_groups_to_ls,
>>                                input_data.nbrec_port_group_table,
>>                                input_data.ls_ports);
>>
>> @@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>>      stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>>      engine_set_node_state(node, EN_UPDATED);
>>  }
>> +
>> +bool
>> +port_group_nb_port_group_handler(struct engine_node *node, void *data_)
>> +{
>> +    struct port_group_input input_data = port_group_get_input_data(node);
>> +    struct port_group_data *data = data_;
>> +    bool success = true;
>> +
>> +    const struct nbrec_port_group_table *nb_pg_table =
>> +        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
>> +    const struct nbrec_port_group *nb_pg;
>> +
>> +    /* Return false if a port group is created or deleted.
>> +     * Handle I-P for only updated port groups. */
>> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
>> +        if (nbrec_port_group_is_new(nb_pg) ||
>> +                nbrec_port_group_is_deleted(nb_pg)) {
>> +            return false;
>> +        }
>> +    }
>> +
>> +    struct hmapx updated_ls_port_groups =
>> +        HMAPX_INITIALIZER(&updated_ls_port_groups);
>> +
>> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
>> +        /* Newly created port groups can't be incrementally processed;
>> +         * the rest yes. */
> 
> The comment here is incorrect and irrelevant, because apparently newly
> created port groups and deleted port groups are both excluded in the
> previous loop.
> 

OK.

>> +        if (ls_port_group_process(&data->ls_port_groups,
>> +                                  &data->port_groups_to_ls,
>> +                                  input_data.ls_ports,
>> +                                  nb_pg, &updated_ls_port_groups)) {
>> +            success = false;
>> +            break;
> 
> Related to my comments above in the ls_port_group_process, I think the goal
> here is to tell if there are PG changes that would impact ACL related
> lflows in any logical switches. The current ls_port_group_process returns
> true only if ls_pg is created or destroyed, but didn't detect the cases
> when:
> 1. a new PG is added to an existing ls_pg, or
> 2. when a PG is deleted from a ls_pg but the ls_pg still contains other PGs
> and so was not destroyed.
> 
> For these cases, we should set data->ls_port_groups_sets_unchanged to
> false, so that in lflow node we can fallback to compute.
> Case 1 still works in current implementation, but it is because in the
> below code when updating SB it would find that the SB PG for the new ls_pg
> doesn't exist and it fallback to recompute (while the SB updating logic
> could actually incrementally create the PG in SB - see comment below).
> Case 2 doesn't work with the current implementation, I think. And the
> scenario is in fact not covered in the test case.
> 

My initial goal was to cover both cases and fall back to recompute.
I'll fix that in v2.

>> +        }
>> +    }
>> +
>> +    /* If changes have been successfully processed incrementally then
> update
>> +     * the SB too. */
>> +    if (success) {
> 
> If ls_port_group_process returns false, it only means that there may be ACL
> related flows that need to be updated for the related logical switches, but
> the port_group node still can be incrementally processed. The
> data->ls_port_groups_sets_unchanged == false is sufficient to indicate that
> the lflow node needs to recompute.
> 
>> +        struct ovsdb_idl_index *sbrec_port_group_by_name =
>> +            engine_ovsdb_node_get_index(
>> +                    engine_get_input("SB_port_group", node),
>> +                    "sbrec_port_group_by_name");
>> +        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
>> +
>> +        struct hmapx_node *updated_node;
>> +        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
>> +            const struct ls_port_group *ls_pg = updated_node->data;
>> +            struct ls_port_group_record *ls_pg_rec;
>> +
>> +            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
>> +                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>> +                                        ls_pg->sb_datapath_key,
>> +                                        &sb_pg_name);
>> +
>> +                const struct sbrec_port_group *sb_pg =
>> +
>  sb_port_group_lookup_by_name(sbrec_port_group_by_name,
>> +                                                 ds_cstr(&sb_pg_name));
>> +                if (!sb_pg) {
>> +                    success = false;
> 
> If sb_pg is not found, it can still be incrementally created, and there is
> no need to recompute in this node.
> 

It's used to determine that a switch got a new port group.  I replied
above to one of your earlier comments.

>> +                    break;
>> +                }
>> +                struct sorted_array nb_ports =
>> +                    sorted_array_from_sset(&ls_pg_rec->ports);
>> +                update_sb_port_group(&nb_ports, sb_pg);
>> +                sorted_array_destroy(&nb_ports);
>> +            }
>> +        }
>> +        ds_destroy(&sb_pg_name);
> 
> I expect this function to handle the deletion of the SB port_groups that
> are no longer needed, but I didn't see such logic. Did I miss anything?
> 

The bug I had made it that those were not always cleared (your case 2
above).  I'll fix that in v2 and (similar to what we do for
SB.address_set) whenever we have to create or delete a SB.port_group
we'll trigger a recompute of the port_group node.

>> +    }
>> +
>> +    data->ls_port_groups_sets_unchanged = success;
>> +    engine_set_node_state(node, EN_UPDATED);
>> +    hmapx_destroy(&updated_ls_port_groups);
>> +    return success;
>> +}
>> +
>> +static void
>> +sb_port_group_apply_diff(const void *arg, const char *item, bool add)
>> +{
>> +    const struct sbrec_port_group *pg = arg;
>> +    if (add) {
>> +        sbrec_port_group_update_ports_addvalue(pg, item);
>> +    } else {
>> +        sbrec_port_group_update_ports_delvalue(pg, item);
>> +    }
>> +}
>> +
>> +static void
>> +update_sb_port_group(struct sorted_array *nb_ports,
>> +                     const struct sbrec_port_group *sb_pg)
>> +{
>> +    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
>> +    sorted_array_apply_diff(nb_ports, &sb_ports,
>> +                            sb_port_group_apply_diff, sb_pg);
>> +    sorted_array_destroy(&sb_ports);
>> +}
>> +
>> +static void
>> +sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
>> +                struct sorted_array *ports,
>> +                struct shash *sb_port_groups)
>> +{
>> +    const struct sbrec_port_group *sb_port_group =
>> +        shash_find_and_delete(sb_port_groups, sb_pg_name);
>> +    if (!sb_port_group) {
>> +        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
>> +        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
>> +        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
>> +    } else {
>> +        update_sb_port_group(ports, sb_port_group);
>> +    }
>> +}
>> +
>> +/* Finds and returns the port group set with the given 'name', or NULL
>> + * if no such port group exists. */
>> +static const struct sbrec_port_group *
>> +sb_port_group_lookup_by_name(struct ovsdb_idl_index
> *sbrec_port_group_by_name,
>> +                             const char *name)
>> +{
>> +    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
>> +        sbrec_port_group_by_name);
>> +    sbrec_port_group_index_set_name(target, name);
>> +
>> +    struct sbrec_port_group *retval = sbrec_port_group_index_find(
>> +        sbrec_port_group_by_name, target);
>> +
>> +    sbrec_port_group_index_destroy_row(target);
>> +    return retval;
>> +}
>> diff --git a/northd/en-port-group.h b/northd/en-port-group.h
>> index 5cbf6c6c4a..c3975f64ee 100644
>> --- a/northd/en-port-group.h
>> +++ b/northd/en-port-group.h
>> @@ -18,6 +18,7 @@
>>
>>  #include <stdint.h>
>>
>> +#include "lib/hmapx.h"
>>  #include "lib/inc-proc-eng.h"
>>  #include "lib/ovn-nb-idl.h"
>>  #include "lib/ovn-sb-idl.h"
>> @@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
>>      const struct ls_port_group_table *,
>>      const struct nbrec_logical_switch *);
>>
>> -void ls_port_group_table_build(struct ls_port_group_table
> *ls_port_groups,
>> -                               const struct nbrec_port_group_table *,
>> -                               const struct hmap *ls_ports);
>> +/* Per port group map of datapaths with ports in the group. */
>> +struct port_group_to_ls_table {
>> +    struct hmap entries; /* Stores struct port_group_to_ls. */
>> +};
>> +
>> +struct port_group_to_ls {
>> +    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
>> +
>> +    const struct nbrec_port_group *nb_pg;
>> +
>> +    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
>> +    struct hmapx switches;
>> +};
>> +
>> +void port_group_to_ls_table_init(struct port_group_to_ls_table *);
>> +void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
>> +void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
>> +
>> +struct port_group_to_ls *port_group_to_ls_table_find(
>> +    const struct port_group_to_ls_table *,
>> +    const struct nbrec_port_group *);
>> +
>> +void ls_port_group_table_build(
>> +    struct ls_port_group_table *ls_port_groups,
>> +    struct port_group_to_ls_table *port_group_to_switches,
> 
> nit: The variable name for the same struct is port_groups_to_ls in the
> struct port_group_data. Better to be consistent.
> 

Sure, I'll change it.

>> +    const struct nbrec_port_group_table *,
>> +    const struct hmap *ls_ports);
>>  void ls_port_group_table_sync(const struct ls_port_group_table
> *ls_port_groups,
>>                                const struct sbrec_port_group_table *,
>>                                struct ovsdb_idl_txn *ovnsb_txn);
>> @@ -75,10 +100,15 @@ struct port_group_input {
>>
>>  struct port_group_data {
>>      struct ls_port_group_table ls_port_groups;
>> +    struct port_group_to_ls_table port_groups_to_ls;
> 
> nit: better to use consistent naming convention. For the struct name:
> ls_port_group_table v.s. port_group_ls_table, or ls_to_port_group_table
> v.s. port_group_to_ls_table.
> And for the variable name: ls_port_groups v.s. port_group_lses
> 

I didn't like "lses" as it didn't sound like a real plural but I guess
it's better to be consistent indeed.  I'll do that.

>> +    bool ls_port_groups_sets_unchanged;
>>  };
>>
>>  void *en_port_group_init(struct engine_node *, struct engine_arg *);
>>  void en_port_group_cleanup(void *data);
>> +void en_port_group_clear_tracked_data(void *data);
>>  void en_port_group_run(struct engine_node *, void *data);
>>
>> +bool port_group_nb_port_group_handler(struct engine_node *, void *data);
>> +
>>  #endif /* EN_PORT_GROUP_H */
>> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
>> index 6d5f9e8d16..bd598ba5e2 100644
>> --- a/northd/inc-proc-northd.c
>> +++ b/northd/inc-proc-northd.c
>> @@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker,
> "mac_binding_aging_waker");
>>  static ENGINE_NODE(northd_output, "northd_output");
>>  static ENGINE_NODE(sync_to_sb, "sync_to_sb");
>>  static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
>> -static ENGINE_NODE(port_group, "port_group");
>> +static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
>>  static ENGINE_NODE(fdb_aging, "fdb_aging");
>>  static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
>>
>> @@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>      engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
>>      engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>>      engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
>> -    engine_add_input(&en_lflow, &en_port_group, NULL);
>> +    engine_add_input(&en_lflow, &en_port_group,
> lflow_port_group_handler);
>>
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
>>                       sync_to_sb_addr_set_nb_address_set_handler);
>> @@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>>      engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
>>
>> -    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
>> +    engine_add_input(&en_port_group, &en_nb_port_group,
>> +                     port_group_nb_port_group_handler);
>>      engine_add_input(&en_port_group, &en_sb_port_group, NULL);
>>      /* No need for an explicit handler for northd changes.  Port changes
>>       * that affect port_groups trigger updates to the NB.Port_Group
>> @@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>>                                  "sbrec_address_set_by_name",
>>                                  sbrec_address_set_by_name);
>>
>> +    struct ovsdb_idl_index *sbrec_port_group_by_name
>> +        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
>> +    engine_ovsdb_node_add_index(&en_sb_port_group,
>> +                                "sbrec_port_group_by_name",
>> +                                sbrec_port_group_by_name);
>> +
>>      struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
>>          = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
>>                                    &sbrec_fdb_col_port_key);
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 4fa1b039ea..44385d604c 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -836,6 +836,10 @@ main(int argc, char *argv[])
>>          ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>>                               &sbrec_multicast_group_columns[i]);
>>      }
>> +    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
>> +        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>> +                             &sbrec_port_group_columns[i]);
>> +    }
>>
>>      unixctl_command_register("sb-connection-status", "", 0, 0,
>>                               ovn_conn_show, ovnsb_idl_loop.idl);
>> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
>> index 1a12513d7a..a04ba2b23f 100644
>> --- a/tests/ovn-northd.at
>> +++ b/tests/ovn-northd.at
>> @@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE
> inc-engine/show-stats sync_to_sb_a
>>  AT_CLEANUP
>>  ])
>>
>> +OVN_FOR_EACH_NORTHD_NO_HV([
>> +AT_SETUP([Port group incremental processing])
>> +ovn_start
>> +
>> +check ovn-nbctl ls-add sw1 \
>> +  -- lsp-add sw1 sw1.1     \
>> +  -- lsp-add sw1 sw1.2     \
>> +  -- lsp-add sw1 sw1.3     \
>> +  -- ls-add sw2            \
>> +  -- lsp-add sw2 sw2.1     \
>> +  -- lsp-add sw2 sw2.2     \
>> +  -- lsp-add sw2 sw2.3
>> +
>> +check ovn-nbctl --wait=sb sync
>> +sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
>> +sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
>> +
>> +check_acl_lflows() {
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
>> +$1
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
>> +$2
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
>> +$3
>> +])
>> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
>> +$4
>> +])
>> +}
>> +
>> +AS_BOX([Create new PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes every time a NB port group is
> added/deleted.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +
> 
> It is highly recommended to extend the _DUMP_DB_TABLES macro to include the
> SB port_group table, and call the CHECK_NO_CHANGE_AFTER_RECOMPUTE macro
> after every sub-test to verify the incremental processing is correct (i.e.
> equivalent to the result of a recompute).
> 

Fair point, I'll dot that.

>> +AS_BOX([Add ACLs on PG1 and PG2])
>> +check ovn-nbctl --wait=sb             \
>> +  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
>> +  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
>> +
>> +AS_BOX([Add one port from the two switches to PG1])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes also every time a port from a new
> switch
>> +dnl is added to the group.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect ACL1 on sw1 and sw2
>> +check_acl_lflows 1 0 1 0
>> +
>> +AS_BOX([Add one port from the two switches to PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl The port_group node recomputes also every time a port from a new
> switch
>> +dnl is added to the group.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl The port_group node is an input for the lflow node.  Port_group
>> +dnl recompute/compute triggers lflow recompute (for ACLs).
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Add one more port from the two switches to PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb                     \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
>> +check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Remove the last port from PG1 and PG2])
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 sw2.1 \
>> +  -- pg-set-ports pg2 sw1.2 sw2.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did not change the set of switches a pg is applied to, there
> should be
>> +dnl no recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and sw2
>> +check_acl_lflows 1 1 1 1
>> +
>> +AS_BOX([Remove the second port from PG1 and PG2])
> 
> The below case is testing when both ports of sw2 are removed from pg1 and
> pg2, which surely should be tested, but we are missing the case when each
> switch is removed from one of the PG but still contain the other PG, which
> should also be tested.
> 

You're completely right, it's the case we discussed above.  I'll add
this test too.

Thanks for the review, I'll prepare a v2 soon.

Regards,
Dumitru

> Thanks,
> Han
> 
>> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
>> +check ovn-nbctl --wait=sb         \
>> +  -- pg-set-ports pg1 sw1.1 \
>> +  -- pg-set-ports pg2 sw1.2
>> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
>> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
>> +
>> +dnl The northd node should not recompute, it should handle nb_global
> update
>> +dnl though, therefore "compute: 1".
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
>> +Node: northd
>> +- recompute:            0
>> +- compute:              1
>> +- abort:                0
>> +])
>> +dnl We did changed the set of switches a pg is applied to, there should
> be
>> +dnl a recompute.
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
>> +Node: port_group
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl We did changed the set of switches a pg is applied to, there should
> be
>> +dnl a recompute (for ACLs).
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
>> +Node: lflow
>> +- recompute:            1
>> +- compute:              0
>> +- abort:                0
>> +])
>> +dnl Expect both ACLs on sw1 and not on sw2.
>> +check_acl_lflows 1 1 0 0
>> +
>> +AT_CLEANUP
>> +])
>> +
>>  OVN_FOR_EACH_NORTHD([
>>  AT_SETUP([Check default drop])
>>  AT_KEYWORDS([drop])
>>
>
diff mbox series

Patch

diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index 7f6a7872b2..1321f79036 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -119,6 +119,23 @@  lflow_northd_handler(struct engine_node *node,
     return true;
 }
 
+bool
+lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
+{
+    struct port_group_data *pg_data =
+        engine_get_input_data("port_group", node);
+
+    /* If the set of switches per port group didn't change then there's no
+     * need to reprocess lflows.  Otherwise, there might be a need to add
+     * port-group ACLs to new switches. */
+    if (!pg_data->ls_port_groups_sets_unchanged) {
+        return false;
+    }
+
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
 void *en_lflow_init(struct engine_node *node OVS_UNUSED,
                      struct engine_arg *arg OVS_UNUSED)
 {
diff --git a/northd/en-lflow.h b/northd/en-lflow.h
index 5e3fbc25e3..5417b2faff 100644
--- a/northd/en-lflow.h
+++ b/northd/en-lflow.h
@@ -13,5 +13,6 @@  void en_lflow_run(struct engine_node *node, void *data);
 void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
 void en_lflow_cleanup(void *data);
 bool lflow_northd_handler(struct engine_node *, void *data);
+bool lflow_port_group_handler(struct engine_node *, void *data);
 
 #endif /* EN_LFLOW_H */
diff --git a/northd/en-port-group.c b/northd/en-port-group.c
index 2c36410246..6902695a01 100644
--- a/northd/en-port-group.c
+++ b/northd/en-port-group.c
@@ -33,15 +33,46 @@  static struct ls_port_group *ls_port_group_create(
 static void ls_port_group_destroy(struct ls_port_group_table *,
                                   struct ls_port_group *);
 
+static bool ls_port_group_process(
+    struct ls_port_group_table *,
+    struct port_group_to_ls_table *,
+    const struct hmap *ls_ports,
+    const struct nbrec_port_group *,
+    struct hmapx *updated_ls_port_groups
+);
+
+static void ls_port_group_record_clear(
+    struct ls_port_group_table *,
+    struct port_group_to_ls *,
+    struct hmapx *updated_ls_port_groups);
+static void ls_port_group_record_prune(struct ls_port_group *);
+
 static struct ls_port_group_record *ls_port_group_record_add(
     struct ls_port_group *,
     const struct nbrec_port_group *,
     const char *port_name);
 
+static struct ls_port_group_record *ls_port_group_record_find(
+    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
+
 static void ls_port_group_record_destroy(
     struct ls_port_group *,
     struct ls_port_group_record *);
 
+static struct port_group_to_ls *port_group_to_ls_create(
+    struct port_group_to_ls_table *,
+    const struct nbrec_port_group *);
+static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
+                                     struct port_group_to_ls *);
+
+static void update_sb_port_group(struct sorted_array *nb_ports,
+                                 const struct sbrec_port_group *sb_pg);
+static void sync_port_group(struct ovsdb_idl_txn *, const char *sb_pg_name,
+                            struct sorted_array *ports,
+                            struct shash *sb_port_groups);
+static const struct sbrec_port_group *sb_port_group_lookup_by_name(
+    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
+
 void
 ls_port_group_table_init(struct ls_port_group_table *table)
 {
@@ -82,39 +113,16 @@  ls_port_group_table_find(const struct ls_port_group_table *table,
 }
 
 void
-ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
-                          const struct nbrec_port_group_table *pg_table,
-                          const struct hmap *ls_ports)
+ls_port_group_table_build(
+    struct ls_port_group_table *ls_port_groups,
+    struct port_group_to_ls_table *port_group_to_switches,
+    const struct nbrec_port_group_table *pg_table,
+    const struct hmap *ls_ports)
 {
     const struct nbrec_port_group *nb_pg;
     NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
-        for (size_t i = 0; i < nb_pg->n_ports; i++) {
-            const char *port_name = nb_pg->ports[i]->name;
-            const struct ovn_datapath *od =
-                northd_get_datapath_for_port(ls_ports, port_name);
-
-            if (!od) {
-                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
-                            port_name, nb_pg->name);
-                continue;
-            }
-
-            if (!od->nbs) {
-                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-                VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
-                             nb_pg->ports[i]->name,
-                             nb_pg->name);
-                continue;
-            }
-
-            struct ls_port_group *ls_pg =
-                ls_port_group_table_find(ls_port_groups, od->nbs);
-            if (!ls_pg) {
-                ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
-            }
-            ls_port_group_record_add(ls_pg, nb_pg, port_name);
-        }
+        ls_port_group_process(ls_port_groups, port_group_to_switches,
+                              ls_ports, nb_pg, NULL);
     }
 }
 
@@ -145,18 +153,11 @@  ls_port_group_table_sync(
             get_sb_port_group_name(ls_pg_rec->nb_pg->name,
                                    ls_pg->sb_datapath_key,
                                    &sb_name);
-            sb_port_group = shash_find_and_delete(&sb_port_groups,
-                                                  ds_cstr(&sb_name));
-            if (!sb_port_group) {
-                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
-                sbrec_port_group_set_name(sb_port_group, ds_cstr(&sb_name));
-            }
-
-            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
-            sbrec_port_group_set_ports(sb_port_group,
-                                       nb_port_names,
-                                       sset_count(&ls_pg_rec->ports));
-            free(nb_port_names);
+            struct sorted_array ports =
+                sorted_array_from_sset(&ls_pg_rec->ports);
+            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
+                            &ports, &sb_port_groups);
+            sorted_array_destroy(&ports);
         }
     }
     ds_destroy(&sb_name);
@@ -201,31 +202,165 @@  ls_port_group_destroy(struct ls_port_group_table *ls_port_groups,
     }
 }
 
+/* Process a NB.Port_Group record and stores any updated ls_port_groups
+ * in updated_ls_port_groups.  Returns true if a new ls_port_group had
+ * to be created or destroyed.
+ */
+static bool
+ls_port_group_process(struct ls_port_group_table *ls_port_groups,
+                      struct port_group_to_ls_table *port_group_to_switches,
+                      const struct hmap *ls_ports,
+                      const struct nbrec_port_group *nb_pg,
+                      struct hmapx *updated_ls_port_groups)
+{
+    struct hmapx cleared_ls_port_groups =
+        HMAPX_INITIALIZER(&cleared_ls_port_groups);
+    bool ls_port_group_created = false;
+
+    struct port_group_to_ls *pg_ls =
+        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
+    if (!pg_ls) {
+        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
+    } else {
+        /* Clear all old records corresponding to this port group; we'll
+         * reprocess it below. */
+        ls_port_group_record_clear(ls_port_groups, pg_ls,
+                                   &cleared_ls_port_groups);
+    }
+
+    for (size_t i = 0; i < nb_pg->n_ports; i++) {
+        const char *port_name = nb_pg->ports[i]->name;
+        const struct ovn_datapath *od =
+            northd_get_datapath_for_port(ls_ports, port_name);
+
+        if (!od) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
+                        port_name, nb_pg->name);
+            continue;
+        }
+
+        if (!od->nbs) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+            VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
+                         nb_pg->ports[i]->name,
+                         nb_pg->name);
+            continue;
+        }
+
+        struct ls_port_group *ls_pg =
+            ls_port_group_table_find(ls_port_groups, od->nbs);
+        if (!ls_pg) {
+            ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
+            ls_port_group_created = true;
+        }
+        ls_port_group_record_add(ls_pg, nb_pg, port_name);
+        hmapx_add(&pg_ls->switches,
+                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
+        if (updated_ls_port_groups) {
+            hmapx_add(updated_ls_port_groups, ls_pg);
+        }
+    }
+
+    bool ls_port_group_destroyed = false;
+    struct hmapx_node *node;
+    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
+        struct ls_port_group *ls_pg = node->data;
+
+        ls_port_group_record_prune(ls_pg);
+
+        if (hmap_is_empty(&ls_pg->nb_pgs)) {
+            ls_port_group_destroy(ls_port_groups, ls_pg);
+            ls_port_group_destroyed = true;
+        }
+    }
+    hmapx_destroy(&cleared_ls_port_groups);
+
+    return ls_port_group_created || ls_port_group_destroyed;
+}
+
+/* Destroys all the struct ls_port_group_record that might be associated to
+ * northbound database logical switches.  Stores ls_port_groups that became
+ * were updated in the 'updated_ls_port_groups' map.
+ */
+static void
+ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
+                           struct port_group_to_ls *pg_ls,
+                           struct hmapx *cleared_ls_port_groups)
+{
+    struct hmapx_node *node;
+
+    HMAPX_FOR_EACH (node, &pg_ls->switches) {
+        const struct nbrec_logical_switch *nbs = node->data;
+
+        struct ls_port_group *ls_pg =
+            ls_port_group_table_find(ls_port_groups, nbs);
+        if (!ls_pg) {
+            continue;
+        }
+
+        /* Clear ports in the port group record. */
+        struct ls_port_group_record *ls_pg_rec =
+            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
+        if (!ls_pg_rec) {
+            continue;
+        }
+
+        sset_clear(&ls_pg_rec->ports);
+        hmapx_add(cleared_ls_port_groups, ls_pg);
+    }
+}
+
+static void
+ls_port_group_record_prune(struct ls_port_group *ls_pg)
+{
+    struct ls_port_group_record *ls_pg_rec;
+
+    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
+        if (sset_is_empty(&ls_pg_rec->ports)) {
+            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
+        }
+    }
+}
+
 static struct ls_port_group_record *
 ls_port_group_record_add(struct ls_port_group *ls_pg,
                          const struct nbrec_port_group *nb_pg,
                          const char *port_name)
 {
-    struct ls_port_group_record *ls_pg_rec = NULL;
+    struct ls_port_group_record *ls_pg_rec =
+        ls_port_group_record_find(ls_pg, nb_pg);
     size_t hash = uuid_hash(&nb_pg->header_.uuid);
 
-    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
-        if (ls_pg_rec->nb_pg == nb_pg) {
-            goto done;
-        }
+    if (!ls_pg_rec) {
+        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
+        *ls_pg_rec = (struct ls_port_group_record) {
+            .nb_pg = nb_pg,
+            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
+        };
+        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
     }
 
-    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
-    *ls_pg_rec = (struct ls_port_group_record) {
-        .nb_pg = nb_pg,
-        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
-    };
-    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
-done:
     sset_add(&ls_pg_rec->ports, port_name);
     return ls_pg_rec;
 }
 
+static struct ls_port_group_record *
+ls_port_group_record_find(struct ls_port_group *ls_pg,
+                          const struct nbrec_port_group *nb_pg)
+{
+    size_t hash = uuid_hash(&nb_pg->header_.uuid);
+    struct ls_port_group_record *ls_pg_rec;
+
+    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
+        if (ls_pg_rec->nb_pg == nb_pg) {
+            return ls_pg_rec;
+        }
+    }
+    return NULL;
+}
+
+
 static void
 ls_port_group_record_destroy(struct ls_port_group *ls_pg,
                              struct ls_port_group_record *ls_pg_rec)
@@ -237,6 +372,71 @@  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
     }
 }
 
+void
+port_group_to_ls_table_init(struct port_group_to_ls_table *table)
+{
+    *table = (struct port_group_to_ls_table) {
+        .entries = HMAP_INITIALIZER(&table->entries),
+    };
+}
+
+void
+port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
+{
+    struct port_group_to_ls *pg_ls;
+    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
+        port_group_to_ls_destroy(table, pg_ls);
+    }
+}
+
+void
+port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
+{
+    port_group_to_ls_table_clear(table);
+    hmap_destroy(&table->entries);
+}
+
+struct port_group_to_ls *
+port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
+                            const struct nbrec_port_group *nb_pg)
+{
+    struct port_group_to_ls *pg_ls;
+
+    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node, uuid_hash(&nb_pg->header_.uuid),
+                             &table->entries) {
+        if (nb_pg == pg_ls->nb_pg) {
+            return pg_ls;
+        }
+    }
+    return NULL;
+}
+
+static struct port_group_to_ls *
+port_group_to_ls_create(struct port_group_to_ls_table *table,
+                        const struct nbrec_port_group *nb_pg)
+{
+    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
+
+    *pg_ls = (struct port_group_to_ls) {
+        .nb_pg = nb_pg,
+        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
+    };
+    hmap_insert(&table->entries, &pg_ls->key_node,
+                uuid_hash(&nb_pg->header_.uuid));
+    return pg_ls;
+}
+
+static void
+port_group_to_ls_destroy(struct port_group_to_ls_table *table,
+                         struct port_group_to_ls *pg_ls)
+{
+    if (pg_ls) {
+        hmapx_destroy(&pg_ls->switches);
+        hmap_remove(&table->entries, &pg_ls->key_node);
+        free(pg_ls);
+    }
+}
+
 /* Incremental processing implementation. */
 static struct port_group_input
 port_group_get_input_data(struct engine_node *node)
@@ -259,6 +459,7 @@  en_port_group_init(struct engine_node *node OVS_UNUSED,
     struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
 
     ls_port_group_table_init(&pg_data->ls_port_groups);
+    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
     return pg_data;
 }
 
@@ -268,6 +469,15 @@  en_port_group_cleanup(void *data_)
     struct port_group_data *data = data_;
 
     ls_port_group_table_destroy(&data->ls_port_groups);
+    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
+}
+
+void
+en_port_group_clear_tracked_data(void *data_)
+{
+    struct port_group_data *data = data_;
+
+    data->ls_port_groups_sets_unchanged = false;
 }
 
 void
@@ -280,7 +490,10 @@  en_port_group_run(struct engine_node *node, void *data_)
     stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
 
     ls_port_group_table_clear(&data->ls_port_groups);
+    port_group_to_ls_table_clear(&data->port_groups_to_ls);
+
     ls_port_group_table_build(&data->ls_port_groups,
+                              &data->port_groups_to_ls,
                               input_data.nbrec_port_group_table,
                               input_data.ls_ports);
 
@@ -291,3 +504,133 @@  en_port_group_run(struct engine_node *node, void *data_)
     stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
     engine_set_node_state(node, EN_UPDATED);
 }
+
+bool
+port_group_nb_port_group_handler(struct engine_node *node, void *data_)
+{
+    struct port_group_input input_data = port_group_get_input_data(node);
+    struct port_group_data *data = data_;
+    bool success = true;
+
+    const struct nbrec_port_group_table *nb_pg_table =
+        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
+    const struct nbrec_port_group *nb_pg;
+
+    /* Return false if a port group is created or deleted.
+     * Handle I-P for only updated port groups. */
+    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
+        if (nbrec_port_group_is_new(nb_pg) ||
+                nbrec_port_group_is_deleted(nb_pg)) {
+            return false;
+        }
+    }
+
+    struct hmapx updated_ls_port_groups =
+        HMAPX_INITIALIZER(&updated_ls_port_groups);
+
+    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
+        /* Newly created port groups can't be incrementally processed;
+         * the rest yes. */
+        if (ls_port_group_process(&data->ls_port_groups,
+                                  &data->port_groups_to_ls,
+                                  input_data.ls_ports,
+                                  nb_pg, &updated_ls_port_groups)) {
+            success = false;
+            break;
+        }
+    }
+
+    /* If changes have been successfully processed incrementally then update
+     * the SB too. */
+    if (success) {
+        struct ovsdb_idl_index *sbrec_port_group_by_name =
+            engine_ovsdb_node_get_index(
+                    engine_get_input("SB_port_group", node),
+                    "sbrec_port_group_by_name");
+        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
+
+        struct hmapx_node *updated_node;
+        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
+            const struct ls_port_group *ls_pg = updated_node->data;
+            struct ls_port_group_record *ls_pg_rec;
+
+            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
+                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
+                                        ls_pg->sb_datapath_key,
+                                        &sb_pg_name);
+
+                const struct sbrec_port_group *sb_pg =
+                    sb_port_group_lookup_by_name(sbrec_port_group_by_name,
+                                                 ds_cstr(&sb_pg_name));
+                if (!sb_pg) {
+                    success = false;
+                    break;
+                }
+                struct sorted_array nb_ports =
+                    sorted_array_from_sset(&ls_pg_rec->ports);
+                update_sb_port_group(&nb_ports, sb_pg);
+                sorted_array_destroy(&nb_ports);
+            }
+        }
+        ds_destroy(&sb_pg_name);
+    }
+
+    data->ls_port_groups_sets_unchanged = success;
+    engine_set_node_state(node, EN_UPDATED);
+    hmapx_destroy(&updated_ls_port_groups);
+    return success;
+}
+
+static void
+sb_port_group_apply_diff(const void *arg, const char *item, bool add)
+{
+    const struct sbrec_port_group *pg = arg;
+    if (add) {
+        sbrec_port_group_update_ports_addvalue(pg, item);
+    } else {
+        sbrec_port_group_update_ports_delvalue(pg, item);
+    }
+}
+
+static void
+update_sb_port_group(struct sorted_array *nb_ports,
+                     const struct sbrec_port_group *sb_pg)
+{
+    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
+    sorted_array_apply_diff(nb_ports, &sb_ports,
+                            sb_port_group_apply_diff, sb_pg);
+    sorted_array_destroy(&sb_ports);
+}
+
+static void
+sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
+                struct sorted_array *ports,
+                struct shash *sb_port_groups)
+{
+    const struct sbrec_port_group *sb_port_group =
+        shash_find_and_delete(sb_port_groups, sb_pg_name);
+    if (!sb_port_group) {
+        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
+        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
+        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
+    } else {
+        update_sb_port_group(ports, sb_port_group);
+    }
+}
+
+/* Finds and returns the port group set with the given 'name', or NULL
+ * if no such port group exists. */
+static const struct sbrec_port_group *
+sb_port_group_lookup_by_name(struct ovsdb_idl_index *sbrec_port_group_by_name,
+                             const char *name)
+{
+    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
+        sbrec_port_group_by_name);
+    sbrec_port_group_index_set_name(target, name);
+
+    struct sbrec_port_group *retval = sbrec_port_group_index_find(
+        sbrec_port_group_by_name, target);
+
+    sbrec_port_group_index_destroy_row(target);
+    return retval;
+}
diff --git a/northd/en-port-group.h b/northd/en-port-group.h
index 5cbf6c6c4a..c3975f64ee 100644
--- a/northd/en-port-group.h
+++ b/northd/en-port-group.h
@@ -18,6 +18,7 @@ 
 
 #include <stdint.h>
 
+#include "lib/hmapx.h"
 #include "lib/inc-proc-eng.h"
 #include "lib/ovn-nb-idl.h"
 #include "lib/ovn-sb-idl.h"
@@ -54,9 +55,33 @@  struct ls_port_group *ls_port_group_table_find(
     const struct ls_port_group_table *,
     const struct nbrec_logical_switch *);
 
-void ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
-                               const struct nbrec_port_group_table *,
-                               const struct hmap *ls_ports);
+/* Per port group map of datapaths with ports in the group. */
+struct port_group_to_ls_table {
+    struct hmap entries; /* Stores struct port_group_to_ls. */
+};
+
+struct port_group_to_ls {
+    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
+
+    const struct nbrec_port_group *nb_pg;
+
+    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
+    struct hmapx switches;
+};
+
+void port_group_to_ls_table_init(struct port_group_to_ls_table *);
+void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
+void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
+
+struct port_group_to_ls *port_group_to_ls_table_find(
+    const struct port_group_to_ls_table *,
+    const struct nbrec_port_group *);
+
+void ls_port_group_table_build(
+    struct ls_port_group_table *ls_port_groups,
+    struct port_group_to_ls_table *port_group_to_switches,
+    const struct nbrec_port_group_table *,
+    const struct hmap *ls_ports);
 void ls_port_group_table_sync(const struct ls_port_group_table *ls_port_groups,
                               const struct sbrec_port_group_table *,
                               struct ovsdb_idl_txn *ovnsb_txn);
@@ -75,10 +100,15 @@  struct port_group_input {
 
 struct port_group_data {
     struct ls_port_group_table ls_port_groups;
+    struct port_group_to_ls_table port_groups_to_ls;
+    bool ls_port_groups_sets_unchanged;
 };
 
 void *en_port_group_init(struct engine_node *, struct engine_arg *);
 void en_port_group_cleanup(void *data);
+void en_port_group_clear_tracked_data(void *data);
 void en_port_group_run(struct engine_node *, void *data);
 
+bool port_group_nb_port_group_handler(struct engine_node *, void *data);
+
 #endif /* EN_PORT_GROUP_H */
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index 6d5f9e8d16..bd598ba5e2 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -137,7 +137,7 @@  static ENGINE_NODE(mac_binding_aging_waker, "mac_binding_aging_waker");
 static ENGINE_NODE(northd_output, "northd_output");
 static ENGINE_NODE(sync_to_sb, "sync_to_sb");
 static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
-static ENGINE_NODE(port_group, "port_group");
+static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
 static ENGINE_NODE(fdb_aging, "fdb_aging");
 static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
 
@@ -193,7 +193,7 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
     engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
     engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
-    engine_add_input(&en_lflow, &en_port_group, NULL);
+    engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
 
     engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
                      sync_to_sb_addr_set_nb_address_set_handler);
@@ -202,7 +202,8 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
     engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
 
-    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
+    engine_add_input(&en_port_group, &en_nb_port_group,
+                     port_group_nb_port_group_handler);
     engine_add_input(&en_port_group, &en_sb_port_group, NULL);
     /* No need for an explicit handler for northd changes.  Port changes
      * that affect port_groups trigger updates to the NB.Port_Group
@@ -287,6 +288,12 @@  void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                                 "sbrec_address_set_by_name",
                                 sbrec_address_set_by_name);
 
+    struct ovsdb_idl_index *sbrec_port_group_by_name
+        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
+    engine_ovsdb_node_add_index(&en_sb_port_group,
+                                "sbrec_port_group_by_name",
+                                sbrec_port_group_by_name);
+
     struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
         = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
                                   &sbrec_fdb_col_port_key);
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 4fa1b039ea..44385d604c 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -836,6 +836,10 @@  main(int argc, char *argv[])
         ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
                              &sbrec_multicast_group_columns[i]);
     }
+    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
+        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
+                             &sbrec_port_group_columns[i]);
+    }
 
     unixctl_command_register("sb-connection-status", "", 0, 0,
                              ovn_conn_show, ovnsb_idl_loop.idl);
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 1a12513d7a..a04ba2b23f 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -8936,6 +8936,252 @@  AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats sync_to_sb_a
 AT_CLEANUP
 ])
 
+OVN_FOR_EACH_NORTHD_NO_HV([
+AT_SETUP([Port group incremental processing])
+ovn_start
+
+check ovn-nbctl ls-add sw1 \
+  -- lsp-add sw1 sw1.1     \
+  -- lsp-add sw1 sw1.2     \
+  -- lsp-add sw1 sw1.3     \
+  -- ls-add sw2            \
+  -- lsp-add sw2 sw2.1     \
+  -- lsp-add sw2 sw2.2     \
+  -- lsp-add sw2 sw2.3
+
+check ovn-nbctl --wait=sb sync
+sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
+sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
+
+check_acl_lflows() {
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
+$1
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
+$2
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
+$3
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
+$4
+])
+}
+
+AS_BOX([Create new PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes every time a NB port group is added/deleted.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+
+AS_BOX([Add ACLs on PG1 and PG2])
+check ovn-nbctl --wait=sb             \
+  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
+  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
+
+AS_BOX([Add one port from the two switches to PG1])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 sw2.1
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes also every time a port from a new switch
+dnl is added to the group.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect ACL1 on sw1 and sw2
+check_acl_lflows 1 0 1 0
+
+AS_BOX([Add one port from the two switches to PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb \
+  -- pg-set-ports pg2 sw1.2 sw2.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes also every time a port from a new switch
+dnl is added to the group.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute (for ACLs).
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Add one more port from the two switches to PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb                     \
+  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
+  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
+check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Remove the last port from PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 sw2.1 \
+  -- pg-set-ports pg2 sw1.2 sw2.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Remove the second port from PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 \
+  -- pg-set-ports pg2 sw1.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], [0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did changed the set of switches a pg is applied to, there should be
+dnl a recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl We did changed the set of switches a pg is applied to, there should be
+dnl a recompute (for ACLs).
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], [0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and not on sw2.
+check_acl_lflows 1 1 0 0
+
+AT_CLEANUP
+])
+
 OVN_FOR_EACH_NORTHD([
 AT_SETUP([Check default drop])
 AT_KEYWORDS([drop])