[ovs-dev,v1,3/3] datapath-windows: Optimize conntrack lock implementation.

Message ID 20171114184821.124-4-kumaranand@vmware.com
State New
Headers show
Series
  • datapath-windows: New lock implementation in conntrack
Related show

Commit Message

Anand Kumar Nov. 14, 2017, 6:48 p.m.
Currently, there is one global lock for conntrack module, which protects
conntrack entries and conntrack table. All the NAT operations are
performed holding this lock.

This becomes inefficient, as the number of conntrack entries grow.
With new implementation, we will have two PNDIS_RW_LOCK_EX locks in
conntrack.

1. ovsCtBucketLock - one rw lock per bucket of the conntrack table,
which is shared by all the ct entries that belong to the same bucket.
2. lock -  a rw lock in OVS_CT_ENTRY structure that protects the members
of conntrack entry.

Also, OVS_CT_ENTRY structure will have a lock reference(bucketLockRef)
to the corresponding OvsCtBucketLock of conntrack table.
We need this reference to retrieve ovsCtBucketLock from ct entry
for delete operation.

Signed-off-by: Anand Kumar <kumaranand@vmware.com>
---
 datapath-windows/ovsext/Conntrack-nat.c |   6 +
 datapath-windows/ovsext/Conntrack.c     | 231 ++++++++++++++++++++------------
 datapath-windows/ovsext/Conntrack.h     |   3 +
 3 files changed, 154 insertions(+), 86 deletions(-)

Patch

diff --git a/datapath-windows/ovsext/Conntrack-nat.c b/datapath-windows/ovsext/Conntrack-nat.c
index 7975770..33a86cf 100644
--- a/datapath-windows/ovsext/Conntrack-nat.c
+++ b/datapath-windows/ovsext/Conntrack-nat.c
@@ -167,12 +167,16 @@  OvsNatPacket(OvsForwardingContext *ovsFwdCtx,
 {
     UINT32 natFlag;
     const struct ct_endpoint* endpoint;
+    LOCK_STATE_EX lockState;
+    /* XXX Move conntrack locks out of NAT after implementing lock in NAT. */
+    NdisAcquireRWLockRead(entry->lock, &lockState, 0);
     /* When it is NAT, only entry->rev_key contains NATTED address;
        When it is unNAT, only entry->key contains the UNNATTED address;*/
     const OVS_CT_KEY *ctKey = reverse ? &entry->key : &entry->rev_key;
     BOOLEAN isSrcNat;
 
     if (!(natAction & (NAT_ACTION_SRC | NAT_ACTION_DST))) {
+        NdisReleaseRWLock(entry->lock, &lockState);
         return;
     }
     isSrcNat = (((natAction & NAT_ACTION_SRC) && !reverse) ||
@@ -202,6 +206,7 @@  OvsNatPacket(OvsForwardingContext *ovsFwdCtx,
         }
     } else if (ctKey->dl_type == htons(ETH_TYPE_IPV6)){
         // XXX: IPv6 packet not supported yet.
+        NdisReleaseRWLock(entry->lock, &lockState);
         return;
     }
     if (natAction & (NAT_ACTION_SRC_PORT | NAT_ACTION_DST_PORT)) {
@@ -215,6 +220,7 @@  OvsNatPacket(OvsForwardingContext *ovsFwdCtx,
             }
         }
     }
+    NdisReleaseRWLock(entry->lock, &lockState);
 }
 
 
diff --git a/datapath-windows/ovsext/Conntrack.c b/datapath-windows/ovsext/Conntrack.c
index ba0dc88..f5e4996 100644
--- a/datapath-windows/ovsext/Conntrack.c
+++ b/datapath-windows/ovsext/Conntrack.c
@@ -31,7 +31,7 @@ 
 KSTART_ROUTINE OvsConntrackEntryCleaner;
 static PLIST_ENTRY ovsConntrackTable;
 static OVS_CT_THREAD_CTX ctThreadCtx;
-static PNDIS_RW_LOCK_EX ovsConntrackLockObj;
+static PNDIS_RW_LOCK_EX *ovsCtBucketLock;
 static PNDIS_RW_LOCK_EX ovsCtNatLockObj;
 extern POVS_SWITCH_CONTEXT gOvsSwitchContext;
 static LONG ctTotalEntries;
@@ -47,20 +47,13 @@  static __inline NDIS_STATUS OvsCtFlush(UINT16 zone);
 NTSTATUS
 OvsInitConntrack(POVS_SWITCH_CONTEXT context)
 {
-    NTSTATUS status;
+    NTSTATUS status = STATUS_SUCCESS;
     HANDLE threadHandle = NULL;
     ctTotalEntries = 0;
 
     /* Init the sync-lock */
-    ovsConntrackLockObj = NdisAllocateRWLock(context->NdisFilterHandle);
-    if (ovsConntrackLockObj == NULL) {
-        return STATUS_INSUFFICIENT_RESOURCES;
-    }
-
     ovsCtNatLockObj = NdisAllocateRWLock(context->NdisFilterHandle);
     if (ovsCtNatLockObj == NULL) {
-        NdisFreeRWLock(ovsConntrackLockObj);
-        ovsConntrackLockObj = NULL;
         return STATUS_INSUFFICIENT_RESOURCES;
     }
 
@@ -69,15 +62,26 @@  OvsInitConntrack(POVS_SWITCH_CONTEXT context)
                                                  * CT_HASH_TABLE_SIZE,
                                                  OVS_CT_POOL_TAG);
     if (ovsConntrackTable == NULL) {
-        NdisFreeRWLock(ovsConntrackLockObj);
-        ovsConntrackLockObj = NULL;
         NdisFreeRWLock(ovsCtNatLockObj);
         ovsCtNatLockObj = NULL;
         return STATUS_INSUFFICIENT_RESOURCES;
     }
 
-    for (int i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+    ovsCtBucketLock = OvsAllocateMemoryWithTag(sizeof(PNDIS_RW_LOCK_EX)
+                                               * CT_HASH_TABLE_SIZE,
+                                               OVS_CT_POOL_TAG);
+    if (ovsCtBucketLock == NULL) {
+        status = STATUS_INSUFFICIENT_RESOURCES;
+        goto freeTable;
+    }
+
+    for (UINT32 i = 0; i < CT_HASH_TABLE_SIZE; i++) {
         InitializeListHead(&ovsConntrackTable[i]);
+        ovsCtBucketLock[i] = NdisAllocateRWLock(context->NdisFilterHandle);
+        if (ovsCtBucketLock[i] == NULL) {
+            status = STATUS_INSUFFICIENT_RESOURCES;
+            goto freeBucketLock;
+        }
     }
 
     /* Init CT Cleaner Thread */
@@ -87,16 +91,7 @@  OvsInitConntrack(POVS_SWITCH_CONTEXT context)
                                   &ctThreadCtx);
 
     if (status != STATUS_SUCCESS) {
-        NdisFreeRWLock(ovsConntrackLockObj);
-        ovsConntrackLockObj = NULL;
-
-        NdisFreeRWLock(ovsCtNatLockObj);
-        ovsCtNatLockObj = NULL;
-
-        OvsFreeMemoryWithTag(ovsConntrackTable, OVS_CT_POOL_TAG);
-        ovsConntrackTable = NULL;
-
-        return status;
+        goto freeBucketLock;
     }
 
     ObReferenceObjectByHandle(threadHandle, SYNCHRONIZE, NULL, KernelMode,
@@ -104,13 +99,28 @@  OvsInitConntrack(POVS_SWITCH_CONTEXT context)
     ZwClose(threadHandle);
     threadHandle = NULL;
 
-    status = OvsNatInit();
+    status = OvsNatInit(context);
 
     if (status != STATUS_SUCCESS) {
         OvsCleanupConntrack();
-        return status;
+        goto freeBucketLock;
     }
     return STATUS_SUCCESS;
+
+freeBucketLock:
+    for (UINT32 i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+        if (ovsCtBucketLock[i] != NULL) {
+            NdisFreeRWLock(ovsCtBucketLock[i]);
+        }
+    }
+    OvsFreeMemoryWithTag(ovsCtBucketLock, OVS_CT_POOL_TAG);
+    ovsCtBucketLock = NULL;
+freeTable:
+    OvsFreeMemoryWithTag(ovsConntrackTable, OVS_CT_POOL_TAG);
+    ovsConntrackTable = NULL;
+    NdisFreeRWLock(ovsCtNatLockObj);
+    ovsCtNatLockObj = NULL;
+    return status;
 }
 
 /*
@@ -122,12 +132,9 @@  OvsInitConntrack(POVS_SWITCH_CONTEXT context)
 VOID
 OvsCleanupConntrack(VOID)
 {
-    LOCK_STATE_EX lockState, lockStateNat;
-    NdisAcquireRWLockWrite(ovsConntrackLockObj, &lockState, 0);
+    LOCK_STATE_EX lockStateNat;
     ctThreadCtx.exit = 1;
     KeSetEvent(&ctThreadCtx.event, 0, FALSE);
-    NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
-
     KeWaitForSingleObject(ctThreadCtx.threadObject, Executive,
                           KernelMode, FALSE, NULL);
     ObDereferenceObject(ctThreadCtx.threadObject);
@@ -140,8 +147,14 @@  OvsCleanupConntrack(VOID)
         ovsConntrackTable = NULL;
     }
 
-    NdisFreeRWLock(ovsConntrackLockObj);
-    ovsConntrackLockObj = NULL;
+    for (UINT32 i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+        if (ovsCtBucketLock[i] != NULL) {
+            NdisFreeRWLock(ovsCtBucketLock[i]);
+        }
+    }
+    OvsFreeMemoryWithTag(ovsCtBucketLock, OVS_CT_POOL_TAG);
+    ovsCtBucketLock = NULL;
+
     NdisAcquireRWLockWrite(ovsCtNatLockObj, &lockStateNat, 0);
     OvsNatCleanup();
     NdisReleaseRWLock(ovsCtNatLockObj, &lockStateNat);
@@ -177,11 +190,20 @@  OvsCtUpdateFlowKey(struct OvsFlowKey *key,
     }
 }
 
+/*
+ *----------------------------------------------------------------------------
+ * OvsPostCtEventEntry
+ *     Assumes ct entry lock is acquired
+ *     XXX Refactor OvsPostCtEvent() as it does not require ct entry lock.
+ *----------------------------------------------------------------------------
+ */
 static __inline VOID
 OvsPostCtEventEntry(POVS_CT_ENTRY entry, UINT8 type)
 {
     OVS_CT_EVENT_ENTRY ctEventEntry = {0};
     NdisMoveMemory(&ctEventEntry.entry, entry, sizeof(OVS_CT_ENTRY));
+    ctEventEntry.entry.lock = NULL;
+    ctEventEntry.entry.parent = NULL;
     ctEventEntry.type = type;
     OvsPostCtEvent(&ctEventEntry);
 }
@@ -189,6 +211,8 @@  OvsPostCtEventEntry(POVS_CT_ENTRY entry, UINT8 type)
 static __inline VOID
 OvsCtIncrementCounters(POVS_CT_ENTRY entry, BOOLEAN reply, PNET_BUFFER_LIST nbl)
 {
+    LOCK_STATE_EX lockState;
+    NdisAcquireRWLockWrite(entry->lock, &lockState, 0);
     if (reply) {
         entry->rev_key.byteCount+= OvsPacketLenNBL(nbl);
         entry->rev_key.packetCount++;
@@ -196,12 +220,15 @@  OvsCtIncrementCounters(POVS_CT_ENTRY entry, BOOLEAN reply, PNET_BUFFER_LIST nbl)
         entry->key.byteCount += OvsPacketLenNBL(nbl);
         entry->key.packetCount++;
     }
+    NdisReleaseRWLock(entry->lock, &lockState);
 }
 
 static __inline BOOLEAN
-OvsCtAddEntry(POVS_CT_ENTRY entry, OvsConntrackKeyLookupCtx *ctx,
+OvsCtAddEntry(POVS_SWITCH_CONTEXT switchContext, POVS_CT_ENTRY entry,
+              OvsConntrackKeyLookupCtx *ctx,
               PNAT_ACTION_INFO natInfo, UINT64 now)
 {
+    LOCK_STATE_EX lockState;
     NdisMoveMemory(&entry->key, &ctx->key, sizeof(OVS_CT_KEY));
     NdisMoveMemory(&entry->rev_key, &ctx->key, sizeof(OVS_CT_KEY));
     OvsCtKeyReverse(&entry->rev_key);
@@ -228,10 +255,19 @@  OvsCtAddEntry(POVS_CT_ENTRY entry, OvsConntrackKeyLookupCtx *ctx,
     }
 
     entry->timestampStart = now;
-    InsertHeadList(&ovsConntrackTable[ctx->hash & CT_HASH_TABLE_MASK],
+    entry->lock = NdisAllocateRWLock(switchContext->NdisFilterHandle);
+    if (entry->lock == NULL) {
+        OVS_LOG_ERROR("NdisAllocateRWLock failed : Insufficient resources");
+        return FALSE;
+    }
+    UINT32 bucketIdx = ctx->hash & CT_HASH_TABLE_MASK;
+    entry->bucketLockRef = ovsCtBucketLock[bucketIdx];
+    NdisAcquireRWLockWrite(ovsCtBucketLock[bucketIdx], &lockState, 0);
+    InsertHeadList(&ovsConntrackTable[bucketIdx],
                    &entry->link);
 
     InterlockedIncrement((LONG volatile *)&ctTotalEntries);
+    NdisReleaseRWLock(ovsCtBucketLock[bucketIdx], &lockState);
     return TRUE;
 }
 
@@ -253,7 +289,6 @@  OvsCtEntryCreate(OvsForwardingContext *fwdCtx,
 
     *entryCreated = FALSE;
     state |= OVS_CS_F_NEW;
-
     switch (ipProto) {
     case IPPROTO_TCP:
     {
@@ -301,11 +336,11 @@  OvsCtEntryCreate(OvsForwardingContext *fwdCtx,
     if (parentEntry != NULL && state != OVS_CS_F_INVALID) {
         state |= OVS_CS_F_RELATED;
     }
-
     if (state != OVS_CS_F_INVALID && commit) {
         if (entry) {
             entry->parent = parentEntry;
-            if (OvsCtAddEntry(entry, ctx, natInfo, currentTime)) {
+            if (OvsCtAddEntry(fwdCtx->switchContext, entry, ctx,
+                              natInfo, currentTime)) {
                 *entryCreated = TRUE;
             } else {
                 /* Unable to add entry to the list */
@@ -335,6 +370,8 @@  OvsCtUpdateEntry(OVS_CT_ENTRY* entry,
                  UINT64 now)
 {
     CT_UPDATE_RES status;
+    LOCK_STATE_EX lockState;
+    NdisAcquireRWLockWrite(entry->lock, &lockState, 0);
     switch (ipProto) {
     case IPPROTO_TCP:
     {
@@ -342,25 +379,32 @@  OvsCtUpdateEntry(OVS_CT_ENTRY* entry,
         const TCPHdr *tcp;
         tcp = OvsGetTcp(nbl, l4Offset, &tcpStorage);
         if (!tcp) {
-            status =  CT_UPDATE_INVALID;
+            status = CT_UPDATE_INVALID;
             break;
         }
-        status =  OvsConntrackUpdateTcpEntry(entry, tcp, nbl, reply, now);
+        status = OvsConntrackUpdateTcpEntry(entry, tcp, nbl, reply, now);
         break;
     }
     case IPPROTO_ICMP:
-        status =  OvsConntrackUpdateIcmpEntry(entry, reply, now);
+        status = OvsConntrackUpdateIcmpEntry(entry, reply, now);
         break;
     case IPPROTO_UDP:
-        status =  OvsConntrackUpdateOtherEntry(entry, reply, now);
+        status = OvsConntrackUpdateOtherEntry(entry, reply, now);
         break;
     default:
-        status =  CT_UPDATE_INVALID;
+        status = CT_UPDATE_INVALID;
         break;
     }
+    NdisReleaseRWLock(entry->lock, &lockState);
     return status;
 }
 
+/*
+ *----------------------------------------------------------------------------
+ * OvsCtEntryExpired
+ *     Assumes ct entry lock is acquired
+ *----------------------------------------------------------------------------
+ */
 static __inline BOOLEAN
 OvsCtEntryExpired(POVS_CT_ENTRY entry)
 {
@@ -375,6 +419,8 @@  OvsCtEntryDelete(POVS_CT_ENTRY entry, BOOLEAN forceDelete)
     if (entry == NULL) {
         return;
     }
+    LOCK_STATE_EX lockState;
+    NdisAcquireRWLockWrite(entry->lock, &lockState, 0);
     if (forceDelete || OvsCtEntryExpired(entry)) {
         if (entry->natInfo.natAction) {
             LOCK_STATE_EX lockStateNat;
@@ -384,10 +430,13 @@  OvsCtEntryDelete(POVS_CT_ENTRY entry, BOOLEAN forceDelete)
         }
         OvsPostCtEventEntry(entry, OVS_EVENT_CT_DELETE);
         RemoveEntryList(&entry->link);
+        NdisReleaseRWLock(entry->lock, &lockState);
+        NdisFreeRWLock(entry->lock);
         OvsFreeMemoryWithTag(entry, OVS_CT_POOL_TAG);
         InterlockedDecrement((LONG volatile*)&ctTotalEntries);
         return;
     }
+    NdisReleaseRWLock(entry->lock, &lockState);
 }
 
 static __inline NDIS_STATUS
@@ -438,7 +487,8 @@  OvsCtLookup(OvsConntrackKeyLookupCtx *ctx)
     POVS_CT_ENTRY entry;
     BOOLEAN reply = FALSE;
     POVS_CT_ENTRY found = NULL;
-
+    LOCK_STATE_EX lockState, lockStateTable;
+    UINT32 bucketIdx;
     /* Reverse NAT must be performed before OvsCtLookup, so here
      * we simply need to flip the src and dst in key and compare
      * they are equal. Note that flipped key is not equal to
@@ -450,10 +500,11 @@  OvsCtLookup(OvsConntrackKeyLookupCtx *ctx)
     if (!ctTotalEntries) {
         return found;
     }
-
-    LIST_FORALL(&ovsConntrackTable[ctx->hash & CT_HASH_TABLE_MASK], link) {
+    bucketIdx = ctx->hash & CT_HASH_TABLE_MASK;
+    NdisAcquireRWLockRead(ovsCtBucketLock[bucketIdx], &lockStateTable, 0);
+    LIST_FORALL(&ovsConntrackTable[bucketIdx], link) {
         entry = CONTAINING_RECORD(link, OVS_CT_ENTRY, link);
-
+        NdisAcquireRWLockRead(entry->lock, &lockState, 0);
         if (OvsCtKeyAreSame(ctx->key, entry->key)) {
             found = entry;
             reply = FALSE;
@@ -470,10 +521,13 @@  OvsCtLookup(OvsConntrackKeyLookupCtx *ctx)
             } else {
                 ctx->reply = reply;
             }
+            NdisReleaseRWLock(entry->lock, &lockState);
             break;
         }
+        NdisReleaseRWLock(entry->lock, &lockState);
     }
 
+    NdisReleaseRWLock(ovsCtBucketLock[bucketIdx], &lockStateTable);
     ctx->entry = found;
     return found;
 }
@@ -623,6 +677,7 @@  OvsProcessConntrackEntry(OvsForwardingContext *fwdCtx,
     POVS_CT_ENTRY entry = ctx->entry;
     UINT32 state = 0;
     PNET_BUFFER_LIST curNbl = fwdCtx->curNbl;
+    LOCK_STATE_EX lockState, lockStateTable;
     *entryCreated = FALSE;
 
     /* If an entry was found, update the state based on TCP flags */
@@ -647,7 +702,9 @@  OvsProcessConntrackEntry(OvsForwardingContext *fwdCtx,
             break;
         case CT_UPDATE_NEW:
             //Delete and update the Conntrack
+            NdisAcquireRWLockWrite(ctx->entry->bucketLockRef, &lockStateTable, 0);
             OvsCtEntryDelete(ctx->entry, TRUE);
+            NdisReleaseRWLock(ctx->entry->bucketLockRef, &lockStateTable);
             ctx->entry = NULL;
             entry = OvsCtEntryCreate(fwdCtx, key->ipKey.nwProto, l4Offset,
                                      ctx, key, natInfo, commit, currentTime,
@@ -658,25 +715,26 @@  OvsProcessConntrackEntry(OvsForwardingContext *fwdCtx,
             break;
         }
     }
-
-    if (key->ipKey.nwProto == IPPROTO_TCP && entry) {
-        /* Update the related bit if there is a parent */
-        if (entry->parent) {
-            state |= OVS_CS_F_RELATED;
-        } else {
-            POVS_CT_ENTRY parentEntry;
-            parentEntry = OvsCtRelatedLookup(ctx->key, currentTime);
-            entry->parent = parentEntry;
-            if (parentEntry != NULL) {
+    if (entry) {
+        NdisAcquireRWLockRead(entry->lock, &lockState, 0);
+        if (key->ipKey.nwProto == IPPROTO_TCP) {
+            /* Update the related bit if there is a parent */
+            if (entry->parent) {
                 state |= OVS_CS_F_RELATED;
+            } else {
+                POVS_CT_ENTRY parentEntry;
+                parentEntry = OvsCtRelatedLookup(ctx->key, currentTime);
+                entry->parent = parentEntry;
+                if (parentEntry != NULL) {
+                    state |= OVS_CS_F_RELATED;
+                }
             }
         }
-    }
 
-    /* Copy mark and label from entry into flowKey. If actions specify
-       different mark and label, update the flowKey. */
-    if (entry != NULL) {
+        /* Copy mark and label from entry into flowKey. If actions specify
+           different mark and label, update the flowKey. */
         OvsCtUpdateFlowKey(key, state, zone, entry->mark, &entry->labels);
+        NdisReleaseRWLock(entry->lock, &lockState);
     } else {
         OvsCtUpdateFlowKey(key, state, zone, 0, NULL);
     }
@@ -730,6 +788,8 @@  OvsCtSetMarkLabel(OvsFlowKey *key,
                        MD_LABELS *labels,
                        BOOLEAN *triggerUpdateEvent)
 {
+    LOCK_STATE_EX lockState;
+    NdisAcquireRWLockWrite(entry->lock, &lockState, 0);
     if (mark) {
         OvsConntrackSetMark(key, entry, mark->value, mark->mask,
                             triggerUpdateEvent);
@@ -739,8 +799,15 @@  OvsCtSetMarkLabel(OvsFlowKey *key,
         OvsConntrackSetLabels(key, entry, &labels->value, &labels->mask,
                               triggerUpdateEvent);
     }
+    NdisReleaseRWLock(entry->lock, &lockState);
 }
 
+/*
+ *----------------------------------------------------------------------------
+ * OvsCtEntryExpired
+ *     Assumes ct entry lock is acquired
+ *----------------------------------------------------------------------------
+ */
 static __inline void
 OvsCtUpdateTuple(OvsFlowKey *key, OVS_CT_KEY *ctKey)
 {
@@ -776,23 +843,22 @@  OvsCtExecute_(OvsForwardingContext *fwdCtx,
     POVS_CT_ENTRY entry = NULL;
     PNET_BUFFER_LIST curNbl = fwdCtx->curNbl;
     OvsConntrackKeyLookupCtx ctx = { 0 };
-    LOCK_STATE_EX lockState;
+    LOCK_STATE_EX lockState, lockStateTable;
     UINT64 currentTime;
     NdisGetCurrentSystemTime((LARGE_INTEGER *) &currentTime);
 
-
     /* Retrieve the Conntrack Key related fields from packet */
     OvsCtSetupLookupCtx(key, zone, &ctx, curNbl, layers->l4Offset);
 
-    NdisAcquireRWLockWrite(ovsConntrackLockObj, &lockState, 0);
-
     /* Lookup Conntrack entries for a matching entry */
     entry = OvsCtLookup(&ctx);
     BOOLEAN entryCreated = FALSE;
 
     /* Delete entry in reverse direction if 'force' is specified */
     if (entry && force && ctx.reply) {
+        NdisAcquireRWLockWrite(entry->bucketLockRef, &lockStateTable, 0);
         OvsCtEntryDelete(entry, TRUE);
+        NdisReleaseRWLock(entry->bucketLockRef, &lockStateTable);
         entry = NULL;
     }
 
@@ -801,7 +867,6 @@  OvsCtExecute_(OvsForwardingContext *fwdCtx,
          * This blocks only new entries from being created and doesn't
          * affect existing connections.
          */
-        NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
         OVS_LOG_ERROR("Conntrack Limit hit: %lu", ctTotalEntries);
         return NDIS_STATUS_RESOURCES;
     }
@@ -829,6 +894,7 @@  OvsCtExecute_(OvsForwardingContext *fwdCtx,
     if (entry == NULL) {
         return status;
     }
+
     /*
      * Note that natInfo is not the same as entry->natInfo here. natInfo
      * is decided by action in the openflow rule, entry->natInfo is decided
@@ -857,12 +923,15 @@  OvsCtExecute_(OvsForwardingContext *fwdCtx,
             OVS_LOG_ERROR("Error while parsing the FTP packet");
         }
     }
-
+    NdisAcquireRWLockRead(entry->lock, &lockState, 0);
     /* Add original tuple information to flow Key */
     if (entry->key.dl_type == ntohs(ETH_TYPE_IPV4)) {
         if (entry->parent != NULL) {
             POVS_CT_ENTRY parent = entry->parent;
+            LOCK_STATE_EX lockStateParent;
+            NdisAcquireRWLockRead(parent->lock, &lockStateParent, 0);
             OvsCtUpdateTuple(key, &parent->key);
+            NdisReleaseRWLock(parent->lock, &lockStateParent);
         } else {
             OvsCtUpdateTuple(key, &entry->key);
         }
@@ -875,7 +944,7 @@  OvsCtExecute_(OvsForwardingContext *fwdCtx,
         OvsPostCtEventEntry(entry, OVS_EVENT_CT_UPDATE);
     }
 
-    NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
+    NdisReleaseRWLock(entry->lock, &lockState);
 
     return status;
 }
@@ -1039,13 +1108,7 @@  OvsConntrackEntryCleaner(PVOID data)
     BOOLEAN success = TRUE;
 
     while (success) {
-        if (ovsConntrackLockObj == NULL) {
-            /* Lock has been freed by 'OvsCleanupConntrack()' */
-            break;
-        }
-        NdisAcquireRWLockWrite(ovsConntrackLockObj, &lockState, 0);
         if (context->exit) {
-            NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
             break;
         }
 
@@ -1053,14 +1116,15 @@  OvsConntrackEntryCleaner(PVOID data)
         INT64 threadSleepTimeout = -CT_CLEANUP_INTERVAL;
 
         if (ctTotalEntries) {
-            for (int i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+            for (UINT32 i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+                NdisAcquireRWLockWrite(ovsCtBucketLock[i], &lockState, 0);
                 LIST_FORALL_SAFE(&ovsConntrackTable[i], link, next) {
                     entry = CONTAINING_RECORD(link, OVS_CT_ENTRY, link);
                     OvsCtEntryDelete(entry, FALSE);
                 }
+                NdisReleaseRWLock(ovsCtBucketLock[i], &lockState);
             }
         }
-        NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
         KeWaitForSingleObject(&context->event, Executive, KernelMode,
                               FALSE, (LARGE_INTEGER *)&threadSleepTimeout);
     }
@@ -1079,25 +1143,24 @@  OvsCtFlush(UINT16 zone)
 {
     PLIST_ENTRY link, next;
     POVS_CT_ENTRY entry;
-
     LOCK_STATE_EX lockState, lockStateNat;
-    NdisAcquireRWLockWrite(ovsConntrackLockObj, &lockState, 0);
 
     if (ctTotalEntries) {
         for (UINT32 i = 0; i < CT_HASH_TABLE_SIZE; i++) {
+            NdisAcquireRWLockWrite(ovsCtBucketLock[i], &lockState, 0);
             LIST_FORALL_SAFE(&ovsConntrackTable[i], link, next) {
                 entry = CONTAINING_RECORD(link, OVS_CT_ENTRY, link);
                 /* zone is a non-zero value */
                 if (!zone || zone == entry->key.zone)
                     OvsCtEntryDelete(entry, TRUE);
             }
+            NdisReleaseRWLock(ovsCtBucketLock[i], &lockState);
         }
     }
 
     NdisAcquireRWLockWrite(ovsCtNatLockObj, &lockStateNat, 0);
     OvsNatFlush(zone);
     NdisReleaseRWLock(ovsCtNatLockObj, &lockStateNat);
-    NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
     return NDIS_STATUS_SUCCESS;
 }
 
@@ -1498,7 +1561,6 @@  OvsCreateNlMsgFromCtEntry(POVS_CT_ENTRY entry,
 
     nlMsg = (PNL_MSG_HDR)NlBufAt(&nlBuf, 0, 0);
     nlMsg->nlmsgLen = NlBufSize(&nlBuf);
-
     return STATUS_SUCCESS;
 }
 
@@ -1541,12 +1603,11 @@  OvsCtDumpCmdHandler(POVS_USER_PARAMS_CONTEXT usrParamsCtx,
     UINT32 i = CT_HASH_TABLE_SIZE;
     UINT32 outIndex = 0;
 
-    LOCK_STATE_EX lockState;
-    NdisAcquireRWLockRead(ovsConntrackLockObj, &lockState, 0);
-
+    LOCK_STATE_EX lockState, lockStateTable;
     if (ctTotalEntries) {
         for (i = inBucket; i < CT_HASH_TABLE_SIZE; i++) {
             PLIST_ENTRY head, link;
+            NdisAcquireRWLockRead(ovsCtBucketLock[i], &lockStateTable, 0);
             head = &ovsConntrackTable[i];
             POVS_CT_ENTRY entry = NULL;
 
@@ -1559,7 +1620,7 @@  OvsCtDumpCmdHandler(POVS_USER_PARAMS_CONTEXT usrParamsCtx,
                  */
                 if (outIndex >= inIndex) {
                     entry = CONTAINING_RECORD(link, OVS_CT_ENTRY, link);
-
+                    NdisAcquireRWLockRead(entry->lock, &lockState, 0);
                     rc = OvsCreateNlMsgFromCtEntry(entry,
                                                    usrParamsCtx->outputBuffer,
                                                    usrParamsCtx->outputLength,
@@ -1568,9 +1629,9 @@  OvsCtDumpCmdHandler(POVS_USER_PARAMS_CONTEXT usrParamsCtx,
                                                    msgIn->nlMsg.nlmsgPid,
                                                    msgIn->nfGenMsg.version,
                                                    0);
-
+                    NdisReleaseRWLock(entry->lock, &lockState);
                     if (rc != NDIS_STATUS_SUCCESS) {
-                        NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
+                        NdisReleaseRWLock(ovsCtBucketLock[i], &lockStateTable);
                         return STATUS_UNSUCCESSFUL;
                     }
 
@@ -1580,7 +1641,7 @@  OvsCtDumpCmdHandler(POVS_USER_PARAMS_CONTEXT usrParamsCtx,
 
                 ++outIndex;
             }
-
+            NdisReleaseRWLock(ovsCtBucketLock[i], &lockStateTable);
             if (entry) {
                 break;
             }
@@ -1594,8 +1655,6 @@  OvsCtDumpCmdHandler(POVS_USER_PARAMS_CONTEXT usrParamsCtx,
     }
     instance->dumpState.index[0] = i;
     instance->dumpState.index[1] = outIndex;
-    NdisReleaseRWLock(ovsConntrackLockObj, &lockState);
-
     /* if i < CT_HASH_TABLE_SIZE => entry was found */
     if (i < CT_HASH_TABLE_SIZE) {
         POVS_MESSAGE msgOut = (POVS_MESSAGE)usrParamsCtx->outputBuffer;
diff --git a/datapath-windows/ovsext/Conntrack.h b/datapath-windows/ovsext/Conntrack.h
index 66957e8..6b9c56f 100644
--- a/datapath-windows/ovsext/Conntrack.h
+++ b/datapath-windows/ovsext/Conntrack.h
@@ -99,6 +99,9 @@  typedef struct _NAT_ACTION_INFO {
 } NAT_ACTION_INFO, *PNAT_ACTION_INFO;
 
 typedef struct OVS_CT_ENTRY {
+    /* Reference to ovsCtBucketLock of ovsConntrackTable.*/
+    PNDIS_RW_LOCK_EX bucketLockRef;
+    PNDIS_RW_LOCK_EX lock;       /* Protects OVS_CT_ENTRY. */
     OVS_CT_KEY  key;
     OVS_CT_KEY  rev_key;
     UINT64      expiration;