diff mbox series

[ovs-dev,RFC,ovn,207/207,WIP] optimize port groups and address sets

Message ID 20200812114552.99987-1-vkommadi@redhat.com
State RFC
Headers show
Series None | expand

Commit Message

Venkata Anil Aug. 12, 2020, 11:45 a.m. UTC
From: venkata anil <anilvenkata@redhat.com>

ovn-controller recalculates flows for all the members of a PG or AS
when a new member is added or deleted. Instead, this patch identifies
the changed members and only process their related flows.

Also this patch maintains constant sets for only local ports of a port
group (instead of all ports) to facilitate lesser lexer parsing and
symbols in expr.

This patch is unable to use conjuction flows (as conjuction ids are not
stored) and generate below flows
priority=2002,ct_state=-trk,ip,reg15=0x2,metadata=0x2,nw_src=10.0.0.239 actions=resubmit(,45)

When conjuction ids used by a logical flow are maintained as part of
flow data structures instead of dynamically regenerated during flow
processing, we can update the patch to use the related conjuction ids
for the above flows.

Will test performance impact once the patch is updated with conjuction
flows.

Signed-off-by: Venkata Anil <vkommadi@redhat.com>
---
 controller/binding.c        |   4 +-
 controller/binding.h        |   2 +
 controller/lflow.c          | 405 +++++++++++++++++++++++-
 controller/lflow.h          |  16 +-
 controller/ofctrl.c         |  35 +++
 controller/ofctrl.h         |   4 +
 controller/ovn-controller.c | 604 ++++++++++++++++++++++++++++++------
 7 files changed, 960 insertions(+), 110 deletions(-)
diff mbox series

Patch

diff --git a/controller/binding.c b/controller/binding.c
index 880fbb13b..d4a47572f 100644
--- a/controller/binding.c
+++ b/controller/binding.c
@@ -72,8 +72,6 @@  binding_register_ovs_idl(struct ovsdb_idl *ovs_idl)
 static struct tracked_binding_datapath *tracked_binding_datapath_create(
     const struct sbrec_datapath_binding *,
     bool is_new, struct hmap *tracked_dps);
-static struct tracked_binding_datapath *tracked_binding_datapath_find(
-    struct hmap *, const struct sbrec_datapath_binding *);
 static void tracked_binding_datapath_lport_add(
     const struct sbrec_port_binding *, struct hmap *tracked_datapaths);
 static void update_lport_tracking(const struct sbrec_port_binding *pb,
@@ -722,7 +720,7 @@  tracked_binding_datapath_create(const struct sbrec_datapath_binding *dp,
     return t_dp;
 }
 
-static struct tracked_binding_datapath *
+struct tracked_binding_datapath *
 tracked_binding_datapath_find(struct hmap *tracked_datapaths,
                               const struct sbrec_datapath_binding *dp)
 {
diff --git a/controller/binding.h b/controller/binding.h
index c9740560f..c96eb53ab 100644
--- a/controller/binding.h
+++ b/controller/binding.h
@@ -134,4 +134,6 @@  bool binding_handle_ovs_interface_changes(struct binding_ctx_in *,
 bool binding_handle_port_binding_changes(struct binding_ctx_in *,
                                          struct binding_ctx_out *);
 void binding_tracked_dp_destroy(struct hmap *tracked_datapaths);
+struct tracked_binding_datapath *tracked_binding_datapath_find(
+    struct hmap *, const struct sbrec_datapath_binding *);
 #endif /* controller/binding.h */
diff --git a/controller/lflow.c b/controller/lflow.c
index b2f585727..78a23f790 100644
--- a/controller/lflow.c
+++ b/controller/lflow.c
@@ -15,6 +15,7 @@ 
 
 #include <config.h>
 #include "lflow.h"
+#include "binding.h"
 #include "coverage.h"
 #include "ha-chassis.h"
 #include "lport.h"
@@ -50,9 +51,16 @@  lflow_init(void)
 }
 
 struct lookup_port_aux {
+    struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath;
+    struct ovsdb_idl_index *sbrec_port_binding_by_name;
+    const struct sbrec_datapath_binding *dp;    
+};
+
+struct lookup_tracked_port_aux {
     struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath;
     struct ovsdb_idl_index *sbrec_port_binding_by_name;
     const struct sbrec_datapath_binding *dp;
+    struct hmap *tracked_dp_bindings;
 };
 
 struct condition_aux {
@@ -63,6 +71,7 @@  struct condition_aux {
     /* Resource reference to store the port name referenced
      * in is_chassis_resident() to the logical flow. */
     struct lflow_resource_ref *lfrr;
+    bool lflow_ref_portgroup_addrset;
 };
 
 static bool
@@ -74,6 +83,9 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
                       struct lflow_ctx_out *l_ctx_out);
 static void lflow_resource_add(struct lflow_resource_ref *, enum ref_type,
                                const char *ref_name, const struct uuid *);
+static bool consider_logical_flow_for_pg_as_member_updates(const struct sbrec_logical_flow *,
+                      struct lflow_ctx_in *, struct lflow_ctx_out *,
+                      enum ref_type, bool added, const struct shash *);
 
 static bool
 lookup_port_cb(const void *aux_, const char *port_name, unsigned int *portp)
@@ -128,11 +140,13 @@  is_chassis_resident_cb(const void *c_aux_, const char *port_name)
     }
 
     /* Store the port_name to lflow reference. */
-    int64_t dp_id = pb->datapath->tunnel_key;
-    char buf[16];
-    get_unique_lport_key(dp_id, pb->tunnel_key, buf, sizeof(buf));
-    lflow_resource_add(c_aux->lfrr, REF_TYPE_PORTBINDING, buf,
-                       &c_aux->lflow->header_.uuid);
+    if (!c_aux->lflow_ref_portgroup_addrset) {    
+        int64_t dp_id = pb->datapath->tunnel_key;
+        char buf[16];
+        get_unique_lport_key(dp_id, pb->tunnel_key, buf, sizeof(buf));
+        lflow_resource_add(c_aux->lfrr, REF_TYPE_PORTBINDING, buf,
+                        &c_aux->lflow->header_.uuid);
+    }
 
     if (strcmp(pb->type, "chassisredirect")) {
         /* for non-chassisredirect ports */
@@ -394,10 +408,10 @@  lflow_handle_changed_flows(struct lflow_ctx_in *l_ctx_in,
 }
 
 bool
-lflow_handle_changed_ref(enum ref_type ref_type, const char *ref_name,
+lflow_handle_ref_member_updates(enum ref_type ref_type, const char *ref_name,
                          struct lflow_ctx_in *l_ctx_in,
                          struct lflow_ctx_out *l_ctx_out,
-                         bool *changed)
+                         bool *changed, bool added, const struct shash *ref_shash)
 {
     struct ref_lflow_node *rlfn =
         ref_lflow_lookup(&l_ctx_out->lfrr->ref_lflow_table, ref_type,
@@ -411,6 +425,88 @@  lflow_handle_changed_ref(enum ref_type ref_type, const char *ref_name,
     *changed = false;
     bool ret = true;
 
+    /* lflow_resource_ref maintaints references between resources and the flows
+       and not between resource members. So avoid updating references during
+       member update.
+    */
+
+    /* Re-parse the related lflows. */
+    struct lflow_ref_list_node *lrln;
+    LIST_FOR_EACH (lrln, ref_list, &rlfn->ref_lflow_head) {
+        const struct sbrec_logical_flow *lflow =
+            sbrec_logical_flow_table_get_for_uuid(l_ctx_in->logical_flow_table,
+                                                  &lrln->lflow_uuid);
+        if (!lflow) {
+            VLOG_DBG("Reprocess lflow "UUID_FMT" for resource type: %d,"
+                     " name: %s - not found.",
+                     UUID_ARGS(&lrln->lflow_uuid),
+                     ref_type, ref_name);
+            continue;
+        }
+        VLOG_DBG("Reprocess lflow "UUID_FMT" for resource type: %d,"
+                 " name: %s, operation: %d",
+                 UUID_ARGS(&lrln->lflow_uuid),
+                 ref_type, ref_name, added);
+        
+        if(!consider_logical_flow_for_pg_as_member_updates(lflow, l_ctx_in, l_ctx_out,
+                                                          ref_type, added, ref_shash)) {
+            ret = false;
+            break;
+        }
+        *changed = true;
+    }
+    return ret;
+}
+
+bool
+lflow_handle_changed_ref(enum ref_type ref_type, const char *ref_name,
+                         struct lflow_ctx_in *l_ctx_in,
+                         struct lflow_ctx_out *l_ctx_out,
+                         bool *changed, enum ref_op ref_op)
+{
+    struct ref_lflow_node *rlfn =
+        ref_lflow_lookup(&l_ctx_out->lfrr->ref_lflow_table, ref_type,
+                         ref_name);
+    if (!rlfn) {
+        *changed = false;
+        return true;
+    }
+    VLOG_DBG("Handle changed lflow reference for resource type: %d,"
+             " name: %s. ref_op: %d, ref_type: %d",
+             ref_type, ref_name, ref_op, ref_type);
+    *changed = false;
+    bool ret = true;
+  
+    /* process PG or AS member update by using relevant constant set */ 
+    if (ref_op == REF_OP_UPDATED) {                
+        if (ref_type == REF_TYPE_PORTGROUP) {
+            if (shash_find(l_ctx_in->port_group_members_added, ref_name)) {            
+                ret = lflow_handle_ref_member_updates(ref_type, ref_name, l_ctx_in,
+                                                      l_ctx_out, changed, true,
+                                                      l_ctx_in->port_group_members_added);
+            }
+            if (shash_find(l_ctx_in->port_group_members_deleted, ref_name)) {
+                ret = lflow_handle_ref_member_updates(ref_type, ref_name, l_ctx_in,
+                                                      l_ctx_out, changed, false,
+                                                      l_ctx_in->port_group_members_deleted);         
+            } 
+            return ret;          
+        }
+        if (ref_type == REF_TYPE_ADDRSET) {
+            if (shash_find(l_ctx_in->address_set_members_added, ref_name)) {
+                ret = lflow_handle_ref_member_updates(ref_type, ref_name, l_ctx_in,
+                                                     l_ctx_out, changed, true,
+                                                     l_ctx_in->address_set_members_added);
+            }
+            if (shash_find(l_ctx_in->address_set_members_deleted, ref_name)) {            
+                ret = lflow_handle_ref_member_updates(ref_type, ref_name, l_ctx_in,
+                                                     l_ctx_out, changed, false,
+                                                     l_ctx_in->address_set_members_deleted);
+            }
+            return ret;            
+        }
+    }
+
     hmap_remove(&l_ctx_out->lfrr->ref_lflow_table, &rlfn->node);
 
     struct lflow_ref_list_node *lrln, *next;
@@ -562,6 +658,7 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
     /* Translate OVN match into table of OpenFlow matches. */
     struct hmap matches;
     struct expr *expr;
+    bool lflow_ref_portgroup_addrset = false;
 
     struct sset addr_sets_ref = SSET_INITIALIZER(&addr_sets_ref);
     struct sset port_groups_ref = SSET_INITIALIZER(&port_groups_ref);
@@ -569,7 +666,7 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
                              l_ctx_in->port_groups,
                              &addr_sets_ref, &port_groups_ref,
                              lflow->logical_datapath->tunnel_key,
-                             &error);
+                             &error);                             
     const char *addr_set_name;
     SSET_FOR_EACH (addr_set_name, &addr_sets_ref) {
         lflow_resource_add(l_ctx_out->lfrr, REF_TYPE_ADDRSET, addr_set_name,
@@ -580,6 +677,10 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
         lflow_resource_add(l_ctx_out->lfrr, REF_TYPE_PORTGROUP,
                            port_group_name, &lflow->header_.uuid);
     }
+    /* ref created for new/del PG,AS or ACL*/
+    if (!sset_is_empty(&port_groups_ref) || !sset_is_empty(&addr_sets_ref)) {
+        lflow_ref_portgroup_addrset = true;
+    }
     sset_destroy(&addr_sets_ref);
     sset_destroy(&port_groups_ref);
 
@@ -612,7 +713,8 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
         .chassis = l_ctx_in->chassis,
         .active_tunnels = l_ctx_in->active_tunnels,
         .lflow = lflow,
-        .lfrr = l_ctx_out->lfrr
+        .lfrr = l_ctx_out->lfrr,
+        .lflow_ref_portgroup_addrset = lflow_ref_portgroup_addrset
     };
     expr = expr_simplify(expr, is_chassis_resident_cb, &cond_aux);
     expr = expr_normalize(expr);
@@ -668,8 +770,17 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
                 int64_t dp_id = lflow->logical_datapath->tunnel_key;
                 char buf[16];
                 get_unique_lport_key(dp_id, port_id, buf, sizeof(buf));
-                lflow_resource_add(l_ctx_out->lfrr, REF_TYPE_PORTBINDING, buf,
+                /*
+                Dont maintain references between ports and logical flows which is referencing port groups.
+                Below is hit, when a new port is added (still not bound)
+                1) logical flow created for the port (some flows reference port like inport== etc..)
+                2) As logical port may or many not be bound to a chassis (PB entry exists but with or without chassis)
+                In the above case we add a lflow_ref entry for the port binding reference.               
+                */
+                if (!lflow_ref_portgroup_addrset) {
+                    lflow_resource_add(l_ctx_out->lfrr, REF_TYPE_PORTBINDING, buf,
                                    &lflow->header_.uuid);
+                }
                 if (!sset_contains(l_ctx_in->local_lport_ids, buf)) {
                     VLOG_DBG("lflow "UUID_FMT
                              " port %s in match is not local, skip",
@@ -711,6 +822,277 @@  consider_logical_flow(const struct sbrec_logical_flow *lflow,
     return update_conj_id_ofs(l_ctx_out->conj_id_ofs, n_conjs);
 }
 
+static bool
+lookup_tracked_port_cb(const void *aux_, const char *port_name, unsigned int *portp)
+{
+    const struct lookup_tracked_port_aux *aux = aux_;
+
+    const struct sbrec_port_binding *pb
+        = lport_lookup_by_name(aux->sbrec_port_binding_by_name, port_name);
+    if (pb && pb->datapath == aux->dp) {
+        *portp = pb->tunnel_key;
+        return true;
+    }
+
+    /* We need to identify flows for deleted ports from this chassis
+       and remove these flows. Deleted local ports will be part of
+       tracked_dp_bindings */
+    if (aux->tracked_dp_bindings) {
+        struct tracked_binding_datapath *tracked_dp =
+            tracked_binding_datapath_find(aux->tracked_dp_bindings, aux->dp);
+        if (tracked_dp) {
+            struct tracked_binding_lport *lport = 
+                shash_find_data(&tracked_dp->lports, port_name);
+            if (lport) {        
+                const struct sbrec_port_binding *binding = lport->pb;
+                if (binding) {
+                    *portp = binding->tunnel_key;
+                    return true;
+                }
+            }
+        }
+    }       
+    return false;
+}
+
+/* This function is called for the local port of the port group,
+ * so always return true */
+static bool
+pg_as_is_chassis_resident_cb(const void *c_aux_, const char *port_name)
+{
+    const struct condition_aux *c_aux = c_aux_;
+    if (c_aux && port_name ) {
+        return true;
+    }
+    return false;
+}
+
+/* This function will be called when PG or AS member updated. */
+static bool
+consider_logical_flow_for_pg_as_member_updates(const struct sbrec_logical_flow *lflow,
+                      struct lflow_ctx_in *l_ctx_in, struct lflow_ctx_out *l_ctx_out,
+                      enum ref_type ref_type, bool added, const struct shash *ref_shash)
+{
+    /* Determine translation of logical table IDs to physical table IDs. */
+    bool ingress = !strcmp(lflow->pipeline, "ingress");
+
+    const struct sbrec_datapath_binding *ldp = lflow->logical_datapath;
+    if (!ldp) {
+        VLOG_DBG("lflow "UUID_FMT" has no datapath binding, skip",
+                 UUID_ARGS(&lflow->header_.uuid));
+        return true;
+    }
+    if (!get_local_datapath(l_ctx_in->local_datapaths, ldp->tunnel_key)) {
+        VLOG_DBG("lflow "UUID_FMT" is not for local datapath, skip",
+                 UUID_ARGS(&lflow->header_.uuid));
+        return true;
+    }
+
+    /* Determine translation of logical table IDs to physical table IDs. */
+    uint8_t first_ptable = (ingress
+                            ? OFTABLE_LOG_INGRESS_PIPELINE
+                            : OFTABLE_LOG_EGRESS_PIPELINE);
+    uint8_t ptable = first_ptable + lflow->table_id;
+    uint8_t output_ptable = (ingress
+                             ? OFTABLE_REMOTE_OUTPUT
+                             : OFTABLE_SAVE_INPORT);
+
+    uint64_t ovnacts_stub[1024 / 8];
+    struct ofpbuf ovnacts = OFPBUF_STUB_INITIALIZER(ovnacts_stub);
+    /* Parse OVN logical actions.
+     *
+     * XXX Deny changes to 'outport' in egress pipeline. */
+    struct ovnact_parse_params pp = {
+        .symtab = &symtab,
+        .dhcp_opts = NULL,
+        .dhcpv6_opts = NULL,
+        .nd_ra_opts = NULL,
+        .controller_event_opts = NULL,
+
+        .pipeline = ingress ? OVNACT_P_INGRESS : OVNACT_P_EGRESS,
+        .n_tables = LOG_PIPELINE_LEN,
+        .cur_ltable = lflow->table_id,
+    };
+    struct expr *prereqs;
+    char *error;
+
+    error = ovnacts_parse_string(lflow->actions, &pp, &ovnacts, &prereqs);
+    if (error) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+        VLOG_WARN_RL(&rl, "error parsing actions \"%s\": %s",
+            lflow->actions, error);
+        free(error);
+        ovnacts_free(ovnacts.data, ovnacts.size);
+        ofpbuf_uninit(&ovnacts);
+        return true;
+    }
+
+    /* Translate OVN match into table of OpenFlow matches. */
+    struct hmap matches;
+    struct expr *expr;
+
+    /* Avoid adding lflow references which are already added or deleted
+       when PG or AS got added or deleted. Use updated AS or PG passed
+       by caller of the function.*/
+    const struct shash *port_groups = l_ctx_in->port_groups;
+    const struct shash *addr_sets = l_ctx_in->addr_sets;
+    if (ref_type == REF_TYPE_ADDRSET) {
+        addr_sets = ref_shash;
+    } else {
+        port_groups = ref_shash;
+    }
+    expr = expr_parse_string(lflow->match, &symtab, addr_sets,
+                             port_groups, NULL, NULL,
+                             lflow->logical_datapath->tunnel_key,
+                             &error); 
+    if (!error) {
+        if (prereqs) {
+            expr = expr_combine(EXPR_T_AND, expr, prereqs);
+            prereqs = NULL;
+        }
+        expr = expr_annotate(expr, &symtab, &error);
+    }
+    if (error) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+        VLOG_WARN_RL(&rl, "error parsing match \"%s\": %s",
+                     lflow->match, error);
+        expr_destroy(prereqs);
+        free(error);
+        ovnacts_free(ovnacts.data, ovnacts.size);
+        ofpbuf_uninit(&ovnacts);
+        return true;
+    }
+
+    struct lookup_tracked_port_aux aux = {
+        .sbrec_multicast_group_by_name_datapath
+            = l_ctx_in->sbrec_multicast_group_by_name_datapath,
+        .sbrec_port_binding_by_name = l_ctx_in->sbrec_port_binding_by_name,
+        .dp = lflow->logical_datapath,
+        .tracked_dp_bindings = l_ctx_in->tracked_dp_bindings
+    };
+    struct condition_aux cond_aux = {
+        .sbrec_port_binding_by_name = l_ctx_in->sbrec_port_binding_by_name,
+        .chassis = l_ctx_in->chassis,
+        .active_tunnels = l_ctx_in->active_tunnels,
+        .lflow = lflow,
+        .lfrr = l_ctx_out->lfrr,
+        .lflow_ref_portgroup_addrset = true
+    };
+    expr = expr_simplify(expr, pg_as_is_chassis_resident_cb, &cond_aux);
+    expr = expr_normalize(expr);
+    /* note: matches won't be having conjuction ids. When flows store
+       conjuction ids, we can enhance expr_to_matches to return conj ids
+       even for 1D while processing PG/AS flow */
+    uint32_t n_conjs = expr_to_matches(expr, lookup_tracked_port_cb, &aux,
+                                       &matches);
+    expr_destroy(expr);
+
+    if (hmap_is_empty(&matches)) {
+        VLOG_DBG("lflow "UUID_FMT" matches are empty, skip",
+                 UUID_ARGS(&lflow->header_.uuid));
+        ovnacts_free(ovnacts.data, ovnacts.size);
+        ofpbuf_uninit(&ovnacts);
+        expr_matches_destroy(&matches);
+        return true;
+    }
+
+    uint64_t ofpacts_stub[1024 / 8];
+    struct ofpbuf ofpacts = OFPBUF_STUB_INITIALIZER(ofpacts_stub);
+    /* No actions required for deleting flows */
+    if (!added) {
+        struct expr_match *m;
+        HMAP_FOR_EACH (m, hmap_node, &matches) {
+            match_set_metadata(&m->match,
+                            htonll(lflow->logical_datapath->tunnel_key));
+            if (m->match.wc.masks.conj_id) {
+                m->match.flow.conj_id += *l_ctx_out->conj_id_ofs;
+            }
+            ofctrl_remove_specific_flow(l_ctx_out->flow_table, ptable, lflow->priority,
+                            lflow->header_.uuid.parts[0], &m->match, &ofpacts,
+                            &lflow->header_.uuid);                     
+        }
+        expr_matches_destroy(&matches);
+        return true;
+    }
+
+    /* Encode OVN logical actions into OpenFlow. */
+    struct ovnact_encode_params ep = {
+        .lookup_port = lookup_port_cb,
+        .tunnel_ofport = tunnel_ofport_cb,
+        .aux = &aux,
+        .is_switch = datapath_is_switch(ldp),
+        .group_table = l_ctx_out->group_table,
+        .meter_table = l_ctx_out->meter_table,
+        .lflow_uuid = lflow->header_.uuid,
+
+        .pipeline = ingress ? OVNACT_P_INGRESS : OVNACT_P_EGRESS,
+        .ingress_ptable = OFTABLE_LOG_INGRESS_PIPELINE,
+        .egress_ptable = OFTABLE_LOG_EGRESS_PIPELINE,
+        .output_ptable = output_ptable,
+        .mac_bind_ptable = OFTABLE_MAC_BINDING,
+        .mac_lookup_ptable = OFTABLE_MAC_LOOKUP,
+    };
+    ovnacts_encode(ovnacts.data, ovnacts.size, &ep, &ofpacts);
+    ovnacts_free(ovnacts.data, ovnacts.size);
+    ofpbuf_uninit(&ovnacts);
+
+    /* Prepare the OpenFlow matches for adding to the flow table. */
+    struct expr_match *m;
+    HMAP_FOR_EACH (m, hmap_node, &matches) {
+        match_set_metadata(&m->match,
+                           htonll(lflow->logical_datapath->tunnel_key));
+        if (m->match.wc.masks.conj_id) {
+            m->match.flow.conj_id += *l_ctx_out->conj_id_ofs;
+        }
+        if (datapath_is_switch(ldp)) {
+            unsigned int reg_index
+                = (ingress ? MFF_LOG_INPORT : MFF_LOG_OUTPORT) - MFF_REG0;
+            int64_t port_id = m->match.flow.regs[reg_index];
+            if (port_id) {
+                int64_t dp_id = lflow->logical_datapath->tunnel_key;
+                char buf[16];
+                get_unique_lport_key(dp_id, port_id, buf, sizeof(buf));
+                if (!sset_contains(l_ctx_in->local_lport_ids, buf)) {
+                    VLOG_DBG("lflow "UUID_FMT
+                             " port %s in match is not local, skip",
+                             UUID_ARGS(&lflow->header_.uuid),
+                             buf);
+                    continue;
+                }
+            }
+        }
+        if (!m->n) {            
+            ofctrl_add_flow(l_ctx_out->flow_table, ptable, lflow->priority,
+                            lflow->header_.uuid.parts[0], &m->match, &ofpacts,
+                            &lflow->header_.uuid);
+             
+        } else {
+            uint64_t conj_stubs[64 / 8];
+            struct ofpbuf conj;
+
+            ofpbuf_use_stub(&conj, conj_stubs, sizeof conj_stubs);
+            for (int i = 0; i < m->n; i++) {
+                const struct cls_conjunction *src = &m->conjunctions[i];
+                struct ofpact_conjunction *dst;
+
+                dst = ofpact_put_CONJUNCTION(&conj);
+                dst->id = src->id + *l_ctx_out->conj_id_ofs;
+                dst->clause = src->clause;
+                dst->n_clauses = src->n_clauses;
+            }            
+            ofctrl_add_or_append_flow(l_ctx_out->flow_table, ptable,
+                                      lflow->priority, 0,
+                                      &m->match, &conj, &lflow->header_.uuid);
+            ofpbuf_uninit(&conj);
+        }
+    }
+
+    /* Clean up. */
+    expr_matches_destroy(&matches);
+    ofpbuf_uninit(&ofpacts);
+    return update_conj_id_ofs(l_ctx_out->conj_id_ofs, n_conjs);
+}
+
 static void
 put_load(const uint8_t *data, size_t len,
          enum mf_field_id dst, int ofs, int n_bits,
@@ -936,5 +1318,6 @@  lflow_handle_flows_for_lport(const struct sbrec_port_binding *pb,
                          sizeof(pb_ref_name));
 
     return lflow_handle_changed_ref(REF_TYPE_PORTBINDING, pb_ref_name,
-                                    l_ctx_in, l_ctx_out, &changed);
+                                    l_ctx_in, l_ctx_out, &changed,
+                                    REF_OP_UPDATED);
 }
diff --git a/controller/lflow.h b/controller/lflow.h
index ae02eaf5e..eaa0148ba 100644
--- a/controller/lflow.h
+++ b/controller/lflow.h
@@ -78,6 +78,12 @@  enum ref_type {
     REF_TYPE_PORTBINDING
 };
 
+enum ref_op {
+    REF_OP_ADDED,
+    REF_OP_DELETED,    
+    REF_OP_UPDATED    
+};
+
 /* Maintains the relationship for a pair of named resource and
  * a lflow, indexed by both ref_lflow_table and lflow_ref_table. */
 struct lflow_ref_list_node {
@@ -134,6 +140,11 @@  struct lflow_ctx_in {
     const struct shash *port_groups;
     const struct sset *active_tunnels;
     const struct sset *local_lport_ids;
+    struct hmap *tracked_dp_bindings;
+    struct shash *port_group_members_added;
+    struct shash *port_group_members_deleted;
+    struct shash *address_set_members_added;
+    struct shash *address_set_members_deleted;
 };
 
 struct lflow_ctx_out {
@@ -150,7 +161,10 @@  void lflow_run(struct lflow_ctx_in *, struct lflow_ctx_out *);
 bool lflow_handle_changed_flows(struct lflow_ctx_in *, struct lflow_ctx_out *);
 bool lflow_handle_changed_ref(enum ref_type, const char *ref_name,
                               struct lflow_ctx_in *, struct lflow_ctx_out *,
-                              bool *changed);
+                              bool *changed, enum ref_op);
+bool lflow_handle_ref_member_updates(enum ref_type, const char *ref_name,
+                         struct lflow_ctx_in *, struct lflow_ctx_out *,
+                         bool *changed, bool added, const struct shash *);
 void lflow_handle_changed_neighbors(
     struct ovsdb_idl_index *sbrec_port_binding_by_name,
     const struct sbrec_mac_binding_table *,
diff --git a/controller/ofctrl.c b/controller/ofctrl.c
index b8a9c2da8..763bda256 100644
--- a/controller/ofctrl.c
+++ b/controller/ofctrl.c
@@ -709,6 +709,41 @@  ofctrl_remove_flows(struct ovn_desired_flow_table *flow_table,
     ovn_extend_table_remove_desired(meters, sb_uuid);
 }
 
+/* ANIL */
+void
+ofctrl_check_and_remove_flow(struct ovn_desired_flow_table *flow_table,
+                          uint8_t table_id, uint16_t priority,
+                          uint64_t cookie, const struct match *match,
+                          const struct ofpbuf *actions,
+                          const struct uuid *sb_uuid,
+                          bool log_duplicate_flow)
+{
+    struct ovn_flow *f = ovn_flow_alloc(table_id, priority, cookie, match,
+                                        actions, sb_uuid);
+
+    ovn_flow_log(f, "ofctrl_check_and_remove_flow");
+
+    struct ovn_flow *existing;
+    existing = ovn_flow_lookup(&flow_table->match_flow_table, f, false);
+    if (existing) {        
+        hmap_remove(&flow_table->match_flow_table, &existing->match_hmap_node);
+        hindex_remove(&flow_table->uuid_flow_table, &existing->uuid_hindex_node);
+        ovn_flow_destroy(existing);
+    }
+    ovn_flow_destroy(f);
+}
+
+/* ANIL */
+void
+ofctrl_remove_specific_flow(struct ovn_desired_flow_table *desired_flows,
+                uint8_t table_id, uint16_t priority, uint64_t cookie,
+                const struct match *match, const struct ofpbuf *actions,
+                const struct uuid *sb_uuid)
+{
+    ofctrl_check_and_remove_flow(desired_flows, table_id, priority, cookie,
+                              match, actions, sb_uuid, true);
+}
+
 void
 ofctrl_add_flow(struct ovn_desired_flow_table *desired_flows,
                 uint8_t table_id, uint16_t priority, uint64_t cookie,
diff --git a/controller/ofctrl.h b/controller/ofctrl.h
index 21d2ce648..b22f55b49 100644
--- a/controller/ofctrl.h
+++ b/controller/ofctrl.h
@@ -77,6 +77,10 @@  void ofctrl_add_or_append_flow(struct ovn_desired_flow_table *desired_flows,
                                const struct uuid *sb_uuid);
 
 void ofctrl_remove_flows(struct ovn_desired_flow_table *, const struct uuid *);
+void ofctrl_remove_specific_flow(struct ovn_desired_flow_table *,
+                uint8_t table_id, uint16_t priority, uint64_t cookie,
+                const struct match *, const struct ofpbuf *,
+                const struct uuid *);
 
 void ovn_desired_flow_table_init(struct ovn_desired_flow_table *);
 void ovn_desired_flow_table_clear(struct ovn_desired_flow_table *);
diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 5ca32ac43..10079862f 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -83,6 +83,65 @@  static unixctl_cb_func cluster_state_reset_cmd;
 static char *parse_options(int argc, char *argv[]);
 OVS_NO_RETURN static void usage(void);
 
+/* 'members' shash maintains all addresses of a port group. It is used to
+ * identify new addresses or deleted addresses and generating corresponding
+ * symtab entries.
+ * 'members_deleted' and 'members_added' represent symtab entries for the
+ * deleted or added address sets. When address added to AS or deleted from AS
+ * these symtab entries gets genered and later used in corresponding flow
+ * calculation.
+ * 'new', 'deleted' and 'updated' sset will represent the corresponding AS */
+struct ed_type_addr_sets {
+    struct shash addr_sets;
+    struct shash members_added;
+    struct shash members_deleted;
+    bool change_tracked;
+    struct sset new;
+    struct sset deleted;
+    struct sset updated;
+    struct shash members;
+};
+
+/* This structure is added to members_updated shash while processing a PG.
+ * It maintaints all the ports of the PG in 'ports' sset. These ports will be
+ * retained troughout the life cycle of ovn-controller, hence not cleared in
+ * en_port_groups_clear_tracked_data.
+ * 'added' and 'deleted' sset are part of tracked data and represents ports
+ * added or deleted in this engine run and cleared in
+ * en_port_groups_clear_tracked_data. These are used to generate
+ * members_deleted_cs and members_added_cs symtab entries. */
+struct port_group_members {
+    struct sset added;
+    struct sset deleted;
+    struct sset ports;
+};
+
+/* 'port_groups_cs' represeints local ports symtab entries. This is used when
+   when AS members updated i.e AS member is parsed in conjuction with all
+ * symtab entries in 'port_groups_cs'.
+ * 'members_deleted_cs' and 'members_added_cs' represent symtab entries for
+ * the deleted or added ports. When a local port added to PG or deleted from PG
+ * these symtab entries genered and later used in corresponding flow calculation.
+ * 'new', 'deleted' and 'updated' sset will represent the corresponding PGs*/
+struct ed_type_port_groups{
+    struct shash port_groups_cs;
+    bool change_tracked;
+
+    /* Tracked data */
+    struct sset new;
+    struct sset deleted;
+    struct sset updated;    
+    struct shash members_updated;
+    struct shash members_deleted_cs;
+    struct shash members_added_cs;     
+    
+    bool tracked;
+};
+static void
+initialise_port_group_members(struct shash *, const char *,
+                              struct sset *);
+
+
 /* Pending packet to be injected into connected OVS. */
 struct pending_pkt {
     /* Setting 'conn' indicates that a request is pending. */
@@ -374,72 +433,270 @@  get_ovs_chassis_id(const struct ovsrec_open_vswitch_table *ovs_table)
  * corresponding symtab entries as necessary. */
 static void
 addr_sets_init(const struct sbrec_address_set_table *address_set_table,
-               struct shash *addr_sets)
+               struct shash *addr_sets, struct shash *members)
 {
     const struct sbrec_address_set *as;
     SBREC_ADDRESS_SET_TABLE_FOR_EACH (as, address_set_table) {
         expr_const_sets_add(addr_sets, as->name,
                             (const char *const *) as->addresses,
                             as->n_addresses, true);
+        struct sset *new_addresses = xmalloc(sizeof *new_addresses);
+        sset_init(new_addresses);
+        for (int i=0; i<as->n_addresses; i++) {
+            sset_add(new_addresses, as->addresses[i]);
+        }
+        shash_add(members, as->name, new_addresses);
     }
 }
 
+static void
+addr_set_member_update(void *data, const char *as_name,
+                         const char *const *as_addresses, size_t as_n_addresses)
+{   
+    int count;    
+    struct ed_type_addr_sets *as_data = data;
+    struct sset deleted = SSET_INITIALIZER(&deleted);
+    struct sset added = SSET_INITIALIZER(&added);
+
+    const char *address;
+    struct sset *members =
+        shash_find_data(&as_data->members, as_name);
+    if (members) {                
+        SSET_FOR_EACH(address, members) {                                        
+            for (count=0; count<as_n_addresses; count++) {
+                if (!strcmp(address, as_addresses[count])) {
+                    break;
+                }
+            }
+                        
+            if (count == as_n_addresses) {
+                if (sset_find_and_delete(members, address)) {
+                    sset_add(&deleted, address);                       
+                }
+            }
+        }
+        
+        /* Add new addresses */
+        for (count=0; count<as_n_addresses; count++) {
+            if (!sset_find(members, as_addresses[count])) {                               
+                sset_add(members, as_addresses[count]);
+                sset_add(&added, as_addresses[count]);                                                         
+            }
+        }
+        if (sset_is_empty(members)) {
+            shash_find_and_delete(&as_data->members, as_name);
+            sset_destroy(members);
+            free(members);
+        }
+    } else {
+            /* Add new addreses */            
+            for (count=0; count<as_n_addresses; count++) {
+                sset_add(&added, as_addresses[count]);
+            }
+
+            struct sset *new_addresses = xmalloc(sizeof *new_addresses);
+            sset_init(new_addresses);
+            sset_clone(new_addresses,  &added);
+            shash_add(&as_data->members, as_name, new_addresses);
+    }
+
+    int num_deleted = sset_count(&deleted);
+    int num_added = sset_count(&added);
+
+    if (num_added) {
+        const char **added_arr = sset_array(&added);
+        expr_const_sets_add(&as_data->members_added, as_name,
+                    (const char *const *) added_arr,
+                    num_added, true);
+        free(added_arr);
+    }
+
+    if (num_deleted) {
+        const char **deleted_arr = sset_array(&deleted);
+        expr_const_sets_add(&as_data->members_deleted, as_name,
+                    (const char *const *) deleted_arr,
+                    num_deleted, true);
+        free(deleted_arr);
+    }             
+    sset_destroy(&deleted);
+    sset_destroy(&added);
+}
+
 static void
 addr_sets_update(const struct sbrec_address_set_table *address_set_table,
-                 struct shash *addr_sets, struct sset *new,
-                 struct sset *deleted, struct sset *updated)
+                 struct ed_type_addr_sets *as_data)
 {
     const struct sbrec_address_set *as;
     SBREC_ADDRESS_SET_TABLE_FOR_EACH_TRACKED (as, address_set_table) {
         if (sbrec_address_set_is_deleted(as)) {
-            expr_const_sets_remove(addr_sets, as->name);
-            sset_add(deleted, as->name);
+            expr_const_sets_remove(&as_data->addr_sets, as->name);
+            sset_add(&as_data->deleted, as->name);
         } else {
-            expr_const_sets_add(addr_sets, as->name,
+            expr_const_sets_add(&as_data->addr_sets, as->name,
                                 (const char *const *) as->addresses,
                                 as->n_addresses, true);
             if (sbrec_address_set_is_new(as)) {
-                sset_add(new, as->name);
+                sset_add(&as_data->new, as->name);
             } else {
-                sset_add(updated, as->name);
+                sset_add(&as_data->updated, as->name);
             }
+            addr_set_member_update(as_data, as->name,
+                                   (const char *const *) as->addresses,
+                                   as->n_addresses);
         }
     }
 }
 
 /* Iterate port groups in the southbound database.  Create and update the
- * corresponding symtab entries as necessary. */
+ * corresponding symtab entries as necessary. Symtab entries will be created
+ * for only local ports of the port group. Also store local ports of port group
+ * to identify updated ports during PG update*/
  static void
 port_groups_init(const struct sbrec_port_group_table *port_group_table,
-                 struct shash *port_groups)
+                struct ed_type_port_groups *pg_data, struct sset *local_lports)                 
 {
     const struct sbrec_port_group *pg;
     SBREC_PORT_GROUP_TABLE_FOR_EACH (pg, port_group_table) {
-        expr_const_sets_add(port_groups, pg->name,
-                            (const char *const *) pg->ports,
-                            pg->n_ports, false);
+        struct sset new_ports = SSET_INITIALIZER(&new_ports);
+        for (int i=0; i<pg->n_ports; i++) {
+            if (sset_find(local_lports, pg->ports[i])) {                                
+                sset_add(&new_ports, pg->ports[i]);              
+            }
+        }
+        int new_ports_count = sset_count(&new_ports);
+        if (new_ports_count) {
+            const char **added_arr = sset_array(&new_ports);
+            expr_const_sets_add(&pg_data->port_groups_cs, pg->name,
+                                (const char *const *) added_arr,
+                                new_ports_count, false);
+            free(added_arr);   
+            initialise_port_group_members(&pg_data->members_updated,
+                                          pg->name, &new_ports);
+        }        
+        sset_destroy(&new_ports);
+    }
+}
+
+/* Store local ports of port groups. This is used to identify added or deleted
+ * ports and build corresponding symtab entries. Later these symbtab entries
+ * are used during flow parsing resulting in removing or adding of flows
+ * for only updated ports. */
+static void
+port_group_member_update(struct ed_type_port_groups *pg_data, const char *pg_name,
+                         const char *const *pg_ports, size_t pg_n_ports,
+                         struct sset *local_lports)
+{   
+    int count;
+
+    const char *port;
+    struct port_group_members *members =
+        shash_find_data(&pg_data->members_updated, pg_name);
+    if (members) {
+        /* Prepare deleted ports if port got removed from PG or Chassis*/
+        SSET_FOR_EACH(port, &members->ports) {                                        
+            /* Port removed from chassis*/
+            if (!sset_find(local_lports, port)) {
+                sset_add(&members->deleted, port);                
+                sset_find_and_delete(&members->ports, port);
+                continue;
+            }
+
+            /* Local port removed from PG */
+            for (count=0; count<pg_n_ports; count++) {
+                if (!strcmp(port, pg_ports[count])) {
+                    break;
+                }
+            }
+                        
+            if (count == pg_n_ports) {
+                if (sset_find_and_delete(&members->ports, port)) {
+                    sset_add(&members->deleted, port);                       
+                }
+            }
+        }
+
+        /* Add new ports */
+        for (count=0; count<pg_n_ports; count++) {
+            if (!sset_find(&members->ports, pg_ports[count])) {
+                /* Add if it is a localport */
+                if (sset_find(local_lports, pg_ports[count])) {
+                    sset_add(&members->ports, pg_ports[count]);
+                    sset_add(&members->added, pg_ports[count]);                         
+                }                
+            }
+        }
+    } else {
+        /* Add new ports */
+            struct sset new_ports = SSET_INITIALIZER(&new_ports);
+            for (count=0; count<pg_n_ports; count++) {
+                if (sset_find(local_lports, pg_ports[count])) {                                
+                    sset_add(&new_ports, pg_ports[count]);              
+                }
+            }
+            int new_ports_count = sset_count(&new_ports);
+            if (new_ports_count) {
+                initialise_port_group_members(&pg_data->members_updated,
+                                              pg_name, &new_ports);
+                members = shash_find_data(&pg_data->members_updated, pg_name);
+                sset_clone(&members->added,  &members->ports);
+            }        
+            sset_destroy(&new_ports);     
+    }
+    
+    if (members) {
+        int deleted = sset_count(&members->deleted);
+        int added = sset_count(&members->added);
+
+        if (added) {
+            VLOG_DBG("%d members added for port group %s",
+                 added, pg_name);
+            const char **added_arr = sset_array(&members->added);
+            expr_const_sets_add(&pg_data->members_added_cs, pg_name,
+                        (const char *const *) added_arr,
+                        added, false);
+            free(added_arr);
+        }
+
+        if (deleted) {
+            VLOG_DBG("%d members deleted for port group %s",
+                 deleted, pg_name);
+            const char **deleted_arr = sset_array(&members->deleted);
+            expr_const_sets_add(&pg_data->members_deleted_cs, pg_name,
+                        (const char *const *) deleted_arr,
+                        deleted, false);
+            free(deleted_arr);
+        } 
+
+        if (added || deleted) {
+            const char **added_arr = sset_array(&members->ports);
+            expr_const_sets_add(&pg_data->port_groups_cs, pg_name,
+                        (const char *const *) added_arr,
+                        sset_count(&members->ports), false);
+            free(added_arr);            
+        }          
     }
 }
 
 static void
 port_groups_update(const struct sbrec_port_group_table *port_group_table,
-                   struct shash *port_groups, struct sset *new,
-                   struct sset *deleted, struct sset *updated)
+                   struct ed_type_port_groups *pg_data,
+                   struct sset *local_lports)
 {
     const struct sbrec_port_group *pg;
     SBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (pg, port_group_table) {
         if (sbrec_port_group_is_deleted(pg)) {
-            expr_const_sets_remove(port_groups, pg->name);
-            sset_add(deleted, pg->name);
-        } else {
-            expr_const_sets_add(port_groups, pg->name,
-                                (const char *const *) pg->ports,
-                                pg->n_ports, false);
+            expr_const_sets_remove(&pg_data->port_groups_cs, pg->name);
+            sset_add(&pg_data->deleted, pg->name);
+        } else {               
             if (sbrec_port_group_is_new(pg)) {
-                sset_add(new, pg->name);
+                sset_add(&pg_data->new, pg->name);
             } else {
-                sset_add(updated, pg->name);
+                sset_add(&pg_data->updated, pg->name);
             }
+            port_group_member_update(pg_data, pg->name,
+                                     (const char *const *) pg->ports,
+                                     pg->n_ports, local_lports); 
+            pg_data->tracked = true;           
         }
     }
 }
@@ -824,14 +1081,6 @@  en_ofctrl_is_connected_run(struct engine_node *node, void *data)
     engine_set_node_state(node, EN_VALID);
 }
 
-struct ed_type_addr_sets {
-    struct shash addr_sets;
-    bool change_tracked;
-    struct sset new;
-    struct sset deleted;
-    struct sset updated;
-};
-
 static void *
 en_addr_sets_init(struct engine_node *node OVS_UNUSED,
                   struct engine_arg *arg OVS_UNUSED)
@@ -839,10 +1088,14 @@  en_addr_sets_init(struct engine_node *node OVS_UNUSED,
     struct ed_type_addr_sets *as = xzalloc(sizeof *as);
 
     shash_init(&as->addr_sets);
+    shash_init(&as->members_added);
+    shash_init(&as->members_deleted);
+    shash_init(&as->members);
     as->change_tracked = false;
     sset_init(&as->new);
     sset_init(&as->deleted);
     sset_init(&as->updated);
+
     return as;
 }
 
@@ -851,10 +1104,21 @@  en_addr_sets_cleanup(void *data)
 {
     struct ed_type_addr_sets *as = data;
     expr_const_sets_destroy(&as->addr_sets);
+    expr_const_sets_destroy(&as->members_added);
+    expr_const_sets_destroy(&as->members_deleted);
+    shash_destroy(&as->members_added);
+    shash_destroy(&as->members_deleted);
     shash_destroy(&as->addr_sets);
     sset_destroy(&as->new);
     sset_destroy(&as->deleted);
     sset_destroy(&as->updated);
+
+    struct shash_node *node, *next;
+    SHASH_FOR_EACH_SAFE (node, next, &as->members) {        
+        sset_destroy(node->data);
+        free(node->data);
+        shash_delete(&as->members, node);        
+    }    
 }
 
 static void
@@ -866,12 +1130,21 @@  en_addr_sets_run(struct engine_node *node, void *data)
     sset_clear(&as->deleted);
     sset_clear(&as->updated);
     expr_const_sets_destroy(&as->addr_sets);
+    expr_const_sets_destroy(&as->members_added);
+    expr_const_sets_destroy(&as->members_deleted);
+
+    struct shash_node *sh_node, *next;
+    SHASH_FOR_EACH_SAFE (sh_node, next, &as->members) {        
+        sset_destroy(sh_node->data);
+        free(sh_node->data);
+        shash_delete(&as->members, sh_node);        
+    }    
 
     struct sbrec_address_set_table *as_table =
         (struct sbrec_address_set_table *)EN_OVSDB_GET(
             engine_get_input("SB_address_set", node));
 
-    addr_sets_init(as_table, &as->addr_sets);
+    addr_sets_init(as_table, &as->addr_sets, &as->members);
 
     as->change_tracked = false;
     engine_set_node_state(node, EN_UPDATED);
@@ -885,13 +1158,14 @@  addr_sets_sb_address_set_handler(struct engine_node *node, void *data)
     sset_clear(&as->new);
     sset_clear(&as->deleted);
     sset_clear(&as->updated);
+    expr_const_sets_destroy(&as->members_added);
+    expr_const_sets_destroy(&as->members_deleted);
 
     struct sbrec_address_set_table *as_table =
         (struct sbrec_address_set_table *)EN_OVSDB_GET(
             engine_get_input("SB_address_set", node));
 
-    addr_sets_update(as_table, &as->addr_sets, &as->new,
-                     &as->deleted, &as->updated);
+    addr_sets_update(as_table, as);
 
     if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
             !sset_is_empty(&as->updated)) {
@@ -904,13 +1178,71 @@  addr_sets_sb_address_set_handler(struct engine_node *node, void *data)
     return true;
 }
 
-struct ed_type_port_groups{
-    struct shash port_groups;
-    bool change_tracked;
-    struct sset new;
-    struct sset deleted;
-    struct sset updated;
-};
+static void
+initialise_port_group_members(struct shash *members_updated,
+                              const char *pg_name,
+                              struct sset *new_ports)
+{
+    struct port_group_members *members = xmalloc(sizeof *members);    
+    shash_add(members_updated, pg_name, members);
+    sset_init(&members->added);
+    sset_init(&members->deleted);
+    sset_init(&members->ports);   
+
+    sset_swap(&members->ports, new_ports);
+}
+
+static void
+clear_port_group_members(struct shash *members_updated)
+{
+    /* only clear added and deleted sset. ports will be retained
+       and used in engine run to generate addeded and deleted sset */
+    struct shash_node *node, *next;
+    SHASH_FOR_EACH_SAFE (node, next, members_updated) {
+        struct port_group_members *members = node->data;
+        if (sset_is_empty(&members->ports)) {
+            sset_destroy(&members->ports);
+            sset_destroy(&members->added);
+            sset_destroy(&members->deleted);
+            free(members);
+            shash_delete(members_updated, node);
+        } else {
+            sset_clear(&members->added);
+            sset_clear(&members->deleted);   
+        }
+    }
+}
+
+static void
+destroy_port_group_members(struct shash *members_updated)
+{
+    struct shash_node *node, *next;
+    SHASH_FOR_EACH_SAFE (node, next, members_updated) {
+        struct port_group_members *members = node->data;
+        sset_destroy(&members->ports);
+        sset_destroy(&members->added);
+        sset_destroy(&members->deleted);
+        free(members);
+        shash_delete(members_updated, node);        
+    }
+}
+
+static void
+en_port_groups_clear_tracked_data(void *data_)
+{
+    struct ed_type_port_groups *pg = data_;
+
+    sset_clear(&pg->new);
+    sset_clear(&pg->deleted);
+    sset_clear(&pg->updated);
+
+    clear_port_group_members(&pg->members_updated);
+    
+    expr_const_sets_destroy(&pg->members_deleted_cs);
+    expr_const_sets_destroy(&pg->members_added_cs);
+
+    pg->tracked = false;
+}
 
 static void *
 en_port_groups_init(struct engine_node *node OVS_UNUSED,
@@ -918,11 +1250,16 @@  en_port_groups_init(struct engine_node *node OVS_UNUSED,
 {
     struct ed_type_port_groups *pg = xzalloc(sizeof *pg);
 
-    shash_init(&pg->port_groups);
+    shash_init(&pg->port_groups_cs);
+    shash_init(&pg->members_updated);
     pg->change_tracked = false;
+    shash_init(&pg->members_deleted_cs);
+    shash_init(&pg->members_added_cs);
     sset_init(&pg->new);
     sset_init(&pg->deleted);
     sset_init(&pg->updated);
+    pg->tracked = false;
+
     return pg;
 }
 
@@ -930,49 +1267,142 @@  static void
 en_port_groups_cleanup(void *data)
 {
     struct ed_type_port_groups *pg = data;
-    expr_const_sets_destroy(&pg->port_groups);
-    shash_destroy(&pg->port_groups);
+    expr_const_sets_destroy(&pg->port_groups_cs);    
+    shash_destroy(&pg->port_groups_cs);
+
+    destroy_port_group_members(&pg->members_updated);
+    en_port_groups_clear_tracked_data(pg);
+    shash_destroy(&pg->members_deleted_cs);
+    shash_destroy(&pg->members_added_cs); 
+    shash_destroy(&pg->members_updated);
+
     sset_destroy(&pg->new);
     sset_destroy(&pg->deleted);
     sset_destroy(&pg->updated);
 }
 
+struct ed_type_runtime_data {
+    /* Contains "struct local_datapath" nodes. */
+    struct hmap local_datapaths;
+
+    /* Contains "struct local_binding" nodes. */
+    struct shash local_bindings;
+
+    /* Contains the name of each logical port resident on the local
+     * hypervisor.  These logical ports include the VIFs (and their child
+     * logical ports, if any) that belong to VMs running on the hypervisor,
+     * l2gateway ports for which options:l2gateway-chassis designates the
+     * local hypervisor, and localnet ports. */
+    struct sset local_lports;
+
+    /* Contains the same ports as local_lports, but in the format:
+     * <datapath-tunnel-key>_<port-tunnel-key> */
+    struct sset local_lport_ids;
+    struct sset active_tunnels;
+
+    /* runtime data engine private data. */
+    struct sset egress_ifaces;
+    struct smap local_iface_ids;
+
+    /* Tracked data. See below for more details and comments. */
+    bool tracked;
+    bool local_lports_changed;
+    struct hmap tracked_dp_bindings;
+};
+
 static void
 en_port_groups_run(struct engine_node *node, void *data)
 {
     struct ed_type_port_groups *pg = data;
 
-    sset_clear(&pg->new);
-    sset_clear(&pg->deleted);
-    sset_clear(&pg->updated);
-    expr_const_sets_destroy(&pg->port_groups);
+    destroy_port_group_members(&pg->members_updated);
+    en_port_groups_clear_tracked_data(pg);
+
+    expr_const_sets_destroy(&pg->port_groups_cs);
+
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);            
 
     struct sbrec_port_group_table *pg_table =
         (struct sbrec_port_group_table *)EN_OVSDB_GET(
             engine_get_input("SB_port_group", node));
 
-    port_groups_init(pg_table, &pg->port_groups);
+    port_groups_init(pg_table, pg, &rt_data->local_lports);
 
     pg->change_tracked = false;
     engine_set_node_state(node, EN_UPDATED);
 }
 
+/* When a port got added or deleted from this chassis, identify
+ * it's port groups and reprocess the related flows */
+static bool
+port_groups_runtime_data_handler(struct engine_node *node, void *data)
+{
+    struct ed_type_port_groups *pg_data = data;
+
+    /* If SB_port_group and runtime binding changed simultaneously */
+    if (node->state != EN_UPDATED) {        
+        engine_set_node_state(node, EN_VALID);
+    }
+
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);
+
+    if (!rt_data->tracked) {        
+        return true;
+    }
+
+    struct hmap *tracked_dp_bindings = &rt_data->tracked_dp_bindings;
+    if (hmap_is_empty(tracked_dp_bindings)) {
+        return true;
+    }
+
+    struct sbrec_port_group_table *pg_table =
+        (struct sbrec_port_group_table *)EN_OVSDB_GET(
+            engine_get_input("SB_port_group", node));    
+    
+    pg_data->tracked = true;
+    
+    struct tracked_binding_datapath *tdp;
+    HMAP_FOR_EACH (tdp, node, tracked_dp_bindings) {
+        struct shash_node *shash_node;
+        SHASH_FOR_EACH (shash_node, &tdp->lports) {
+            struct tracked_binding_lport *lport = shash_node->data;            
+            const struct sbrec_port_group *pg;
+            SBREC_PORT_GROUP_TABLE_FOR_EACH (pg, pg_table) {
+                for (int i=0; i<pg->n_ports; i++) {
+                    if (!strcmp(lport->pb->logical_port, pg->ports[i])) {
+                        /* check port removed from chassis or added to chassis */                           
+                        sset_add(&pg_data->updated, pg->name);
+                        port_group_member_update(pg_data, pg->name,
+                                                 (const char *const *)pg->ports,
+                                                 pg->n_ports,
+                                                 &rt_data->local_lports);                        
+                        engine_set_node_state(node, EN_UPDATED);
+                        break;
+                    }
+                }
+            }
+        }
+        
+    }    
+    return true;    
+}
+
 static bool
 port_groups_sb_port_group_handler(struct engine_node *node, void *data)
 {
     struct ed_type_port_groups *pg = data;
 
-    sset_clear(&pg->new);
-    sset_clear(&pg->deleted);
-    sset_clear(&pg->updated);
-
     struct sbrec_port_group_table *pg_table =
         (struct sbrec_port_group_table *)EN_OVSDB_GET(
             engine_get_input("SB_port_group", node));
 
-    port_groups_update(pg_table, &pg->port_groups, &pg->new,
-                     &pg->deleted, &pg->updated);
+    struct ed_type_runtime_data *rt_data =
+        engine_get_input_data("runtime_data", node);   
 
+    port_groups_update(pg_table, pg, &rt_data->local_lports);
+    
     if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
             !sset_is_empty(&pg->updated)) {
         engine_set_node_state(node, EN_UPDATED);
@@ -984,35 +1414,6 @@  port_groups_sb_port_group_handler(struct engine_node *node, void *data)
     return true;
 }
 
-struct ed_type_runtime_data {
-    /* Contains "struct local_datapath" nodes. */
-    struct hmap local_datapaths;
-
-    /* Contains "struct local_binding" nodes. */
-    struct shash local_bindings;
-
-    /* Contains the name of each logical port resident on the local
-     * hypervisor.  These logical ports include the VIFs (and their child
-     * logical ports, if any) that belong to VMs running on the hypervisor,
-     * l2gateway ports for which options:l2gateway-chassis designates the
-     * local hypervisor, and localnet ports. */
-    struct sset local_lports;
-
-    /* Contains the same ports as local_lports, but in the format:
-     * <datapath-tunnel-key>_<port-tunnel-key> */
-    struct sset local_lport_ids;
-    struct sset active_tunnels;
-
-    /* runtime data engine private data. */
-    struct sset egress_ifaces;
-    struct smap local_iface_ids;
-
-    /* Tracked data. See below for more details and comments. */
-    bool tracked;
-    bool local_lports_changed;
-    struct hmap tracked_dp_bindings;
-};
-
 /* struct ed_type_runtime_data has the below members for tracking the
  * changes done to the runtime_data engine by the runtime_data engine
  * handlers. Since this engine is an input to the flow_output engine,
@@ -1661,10 +2062,15 @@  static void init_lflow_ctx(struct engine_node *node,
     struct ed_type_addr_sets *as_data =
         engine_get_input_data("addr_sets", node);
     struct shash *addr_sets = &as_data->addr_sets;
+    struct shash *address_set_members_added = &as_data->members_added;
+    struct shash *address_set_members_deleted = &as_data->members_deleted;
 
     struct ed_type_port_groups *pg_data =
         engine_get_input_data("port_groups", node);
-    struct shash *port_groups = &pg_data->port_groups;
+    struct shash *port_groups = &pg_data->port_groups_cs;
+    struct shash *port_group_members_added = &pg_data->members_added_cs;
+    struct shash *port_group_members_deleted = &pg_data->members_deleted_cs;
+    struct hmap *tracked_dp_bindings = &rt_data->tracked_dp_bindings;
 
     l_ctx_in->sbrec_multicast_group_by_name_datapath =
         sbrec_mc_group_by_name_dp;
@@ -1679,7 +2085,12 @@  static void init_lflow_ctx(struct engine_node *node,
     l_ctx_in->chassis = chassis;
     l_ctx_in->local_datapaths = &rt_data->local_datapaths;
     l_ctx_in->addr_sets = addr_sets;
+    l_ctx_in->address_set_members_added = address_set_members_added;
+    l_ctx_in->address_set_members_deleted = address_set_members_deleted;
     l_ctx_in->port_groups = port_groups;
+    l_ctx_in->port_group_members_added = port_group_members_added;
+    l_ctx_in->port_group_members_deleted = port_group_members_deleted;
+    l_ctx_in->tracked_dp_bindings = tracked_dp_bindings;
     l_ctx_in->active_tunnels = &rt_data->active_tunnels;
     l_ctx_in->local_lport_ids = &rt_data->local_lport_ids;
 
@@ -1700,7 +2111,7 @@  en_flow_output_init(struct engine_node *node OVS_UNUSED,
     ovn_extend_table_init(&data->group_table);
     ovn_extend_table_init(&data->meter_table);
     data->conj_id_ofs = 1;
-    lflow_resource_init(&data->lflow_resource_ref);
+    lflow_resource_init(&data->lflow_resource_ref);    
     return data;
 }
 
@@ -1711,7 +2122,7 @@  en_flow_output_cleanup(void *data)
     ovn_desired_flow_table_destroy(&flow_output_data->flow_table);
     ovn_extend_table_destroy(&flow_output_data->group_table);
     ovn_extend_table_destroy(&flow_output_data->meter_table);
-    lflow_resource_destroy(&flow_output_data->lflow_resource_ref);
+    lflow_resource_destroy(&flow_output_data->lflow_resource_ref);    
 }
 
 static void
@@ -1937,7 +2348,7 @@  _flow_output_resource_ref_handler(struct engine_node *node, void *data,
 
     SSET_FOR_EACH (ref_name, deleted) {
         if (!lflow_handle_changed_ref(ref_type, ref_name, &l_ctx_in,
-                                      &l_ctx_out, &changed)) {
+                                      &l_ctx_out, &changed, REF_OP_DELETED)) {
             return false;
         }
         if (changed) {
@@ -1946,7 +2357,7 @@  _flow_output_resource_ref_handler(struct engine_node *node, void *data,
     }
     SSET_FOR_EACH (ref_name, updated) {
         if (!lflow_handle_changed_ref(ref_type, ref_name, &l_ctx_in,
-                                      &l_ctx_out, &changed)) {
+                                      &l_ctx_out, &changed, REF_OP_UPDATED)) {
             return false;
         }
         if (changed) {
@@ -1955,7 +2366,7 @@  _flow_output_resource_ref_handler(struct engine_node *node, void *data,
     }
     SSET_FOR_EACH (ref_name, new) {
         if (!lflow_handle_changed_ref(ref_type, ref_name, &l_ctx_in,
-                                      &l_ctx_out, &changed)) {
+                                      &l_ctx_out, &changed, REF_OP_ADDED)) {
             return false;
         }
         if (changed) {
@@ -1994,7 +2405,6 @@  flow_output_physical_flow_changes_handler(struct engine_node *node, void *data)
 
     if (pfc_data->recompute_physical_flows) {
         /* This indicates that we need to recompute the physical flows. */
-        physical_clear_unassoc_flows_with_db(&fo->flow_table);
         physical_run(&p_ctx, &fo->flow_table);
         return true;
     }
@@ -2191,7 +2601,7 @@  main(int argc, char *argv[])
                                       "physical_flow_changes");
     ENGINE_NODE(flow_output, "flow_output");
     ENGINE_NODE(addr_sets, "addr_sets");
-    ENGINE_NODE(port_groups, "port_groups");
+    ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_groups, "port_groups");
 
 #define SB_NODE(NAME, NAME_STR) ENGINE_NODE_SB(NAME, NAME_STR);
     SB_NODES
@@ -2205,8 +2615,7 @@  main(int argc, char *argv[])
 
     engine_add_input(&en_addr_sets, &en_sb_address_set,
                      addr_sets_sb_address_set_handler);
-    engine_add_input(&en_port_groups, &en_sb_port_group,
-                     port_groups_sb_port_group_handler);
+
 
     /* Engine node physical_flow_changes indicates whether
      * we can recompute only physical flows or we can
@@ -2268,6 +2677,11 @@  main(int argc, char *argv[])
     engine_add_input(&en_runtime_data, &en_sb_port_binding,
                      runtime_data_sb_port_binding_handler);
 
+    engine_add_input(&en_port_groups, &en_sb_port_group,
+                     port_groups_sb_port_group_handler);
+    engine_add_input(&en_port_groups, &en_runtime_data,
+                     port_groups_runtime_data_handler);
+
     struct engine_arg engine_arg = {
         .sb_idl = ovnsb_idl_loop.idl,
         .ovs_idl = ovs_idl_loop.idl,
@@ -2527,7 +2941,7 @@  main(int argc, char *argv[])
                     engine_get_data(&en_port_groups);
                 if (br_int && chassis && as_data && pg_data) {
                     char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
-                        &as_data->addr_sets, &pg_data->port_groups);
+                        &as_data->addr_sets, &pg_data->port_groups_cs);
                     if (error) {
                         unixctl_command_reply_error(pending_pkt.conn, error);
                         free(error);