diff mbox series

[ovs-dev,v9] conntrack: Add rcu support.

Message ID 1557414907-74063-1-git-send-email-dlu998@gmail.com
State Accepted
Commit 967bb5c5cd9070112138d74a2f4394c50ae48420
Headers show
Series [ovs-dev,v9] conntrack: Add rcu support. | expand

Commit Message

Darrell Ball May 9, 2019, 3:15 p.m. UTC
For performance and code simplification reasons, add rcu support for
conntrack. The array of hmaps is replaced by a cmap as part of this
conversion.  Using a single map also simplifies the handling of NAT
and allows the removal of the nat_conn map and friends.  Per connection
entry locks are introduced, which are needed in a few code paths.

Signed-off-by: Darrell Ball <dlu998@gmail.com>
---

v9: Add missed mutable fields: mark and label.
    Added locking for these fields.

v8: Document conntrack_destroy() caller events (Ben comment).
    Also add to conntrack.h documentation and update 'ct' usage.

    Move 'struct conn' members around to group mutable and immutable
    members better and change some comments to be on the same line.
    (Ben comment).

    Move some inline comments after ';' (Ben comment).

    Adjust comments for struct conntrack so that field descriptions
    are inline (Ben comment).

v7: Document 'conn' locking better; not using OVS_GUARDED bcoz that
    would require several OVS_NO_THREAD_SAFETY_ANALYSIS to avoid
    clang false positive. (Ben comment)

    Use OVS_PACKED_ENUM. (Ben comment)

v6: Patch 1 was applied.
    Fixed and simplified the expiry list logic; minimal impact on
    performance.

v5: Removed a wayward ovs_list_remove() from conn_update_expiration().

    Merged some conn level locking that was missed b4 and adjusted const
    specifier for conn, as a result.

v4: Just include first 2 patches as the other patches are moved to a
    subsequent series.

    Reinstated support for running multiple userpace datapaths at the
    same time.

    Fixed a bug where 'ovs_list_remove()' was misplaced.

    Cleaned up some unneeded code.

v3: Dropped Patch 3 from the series since it was broken/incomplete
    and not worth the fix, since there are some disadvantages in
    terms of maintainability, including treating UDP/ICMP different
    than TCP, while the performance benefit was borderline.

    Fixed bug in 'nat_res_exhaustion' exception code.

    Cleaned up a few APIs - 'conn_clean*'/'delete_conn*' and
    removed an unneeded one, 'conn_available()'.  Added more
    comments.

    Removed non-essential 'nat' field from struct conn_lookup_ctx.

    Fixed some splicing issues b/w Patch 1 and Patch 2, where some
    aspects of Patch 2 were now moved to Patch 1.

v2: Put back somehow deleted '&' in Patch 1 in call to
    check_orig_tuple() for bucket parameter (which is
    removed in Patch 2).

 lib/conntrack-icmp.c    |  27 +-
 lib/conntrack-other.c   |  14 +-
 lib/conntrack-private.h | 247 ++++--------
 lib/conntrack-tcp.c     |  22 +-
 lib/conntrack.c         | 985 +++++++++++++++++-------------------------------
 lib/conntrack.h         |  19 +-
 lib/ct-dpif.h           |   2 +-
 7 files changed, 457 insertions(+), 859 deletions(-)

Comments

Ben Pfaff May 9, 2019, 10:18 p.m. UTC | #1
On Thu, May 09, 2019 at 08:15:07AM -0700, Darrell Ball wrote:
> For performance and code simplification reasons, add rcu support for
> conntrack. The array of hmaps is replaced by a cmap as part of this
> conversion.  Using a single map also simplifies the handling of NAT
> and allows the removal of the nat_conn map and friends.  Per connection
> entry locks are introduced, which are needed in a few code paths.
> 
> Signed-off-by: Darrell Ball <dlu998@gmail.com>

Thank you.  I applied this to master.
diff mbox series

Patch

diff --git a/lib/conntrack-icmp.c b/lib/conntrack-icmp.c
index 40fd1d8..63246f0 100644
--- a/lib/conntrack-icmp.c
+++ b/lib/conntrack-icmp.c
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2015, 2016 Nicira, Inc.
+ * Copyright (c) 2015-2019 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,14 +24,14 @@ 
 #include "conntrack-private.h"
 #include "dp-packet.h"
 
-enum icmp_state {
+enum OVS_PACKED_ENUM icmp_state {
     ICMPS_FIRST,
     ICMPS_REPLY,
 };
 
 struct conn_icmp {
     struct conn up;
-    enum icmp_state state;
+    enum icmp_state state; /* 'conn' lock protected. */
 };
 
 static const enum ct_timeout icmp_timeouts[] = {
@@ -46,16 +46,12 @@  conn_icmp_cast(const struct conn *conn)
 }
 
 static enum ct_update_res
-icmp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+icmp_conn_update(struct conntrack *ct, struct conn *conn_,
                  struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
 {
     struct conn_icmp *conn = conn_icmp_cast(conn_);
-
-    if (reply && conn->state != ICMPS_REPLY) {
-        conn->state = ICMPS_REPLY;
-    }
-
-    conn_update_expiration(ctb, &conn->up, icmp_timeouts[conn->state], now);
+    conn->state = reply ? ICMPS_REPLY : ICMPS_FIRST;
+    conn_update_expiration(ct, &conn->up, icmp_timeouts[conn->state], now);
 
     return CT_UPDATE_VALID;
 }
@@ -79,15 +75,12 @@  icmp6_valid_new(struct dp_packet *pkt)
 }
 
 static struct conn *
-icmp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
-               long long now)
+icmp_new_conn(struct conntrack *ct, struct dp_packet *pkt OVS_UNUSED,
+              long long now)
 {
-    struct conn_icmp *conn;
-
-    conn = xzalloc(sizeof *conn);
+    struct conn_icmp *conn = xzalloc(sizeof *conn);
     conn->state = ICMPS_FIRST;
-
-    conn_init_expiration(ctb, &conn->up, icmp_timeouts[conn->state], now);
+    conn_init_expiration(ct, &conn->up, icmp_timeouts[conn->state], now);
 
     return &conn->up;
 }
diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c
index 2920889..932f2f4 100644
--- a/lib/conntrack-other.c
+++ b/lib/conntrack-other.c
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2015, 2016 Nicira, Inc.
+ * Copyright (c) 2015-2019 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ 
 #include "conntrack-private.h"
 #include "dp-packet.h"
 
-enum other_state {
+enum OVS_PACKED_ENUM other_state {
     OTHERS_FIRST,
     OTHERS_MULTIPLE,
     OTHERS_BIDIR,
@@ -27,7 +27,7 @@  enum other_state {
 
 struct conn_other {
     struct conn up;
-    enum other_state state;
+    enum other_state state; /* 'conn' lock protected. */
 };
 
 static const enum ct_timeout other_timeouts[] = {
@@ -43,7 +43,7 @@  conn_other_cast(const struct conn *conn)
 }
 
 static enum ct_update_res
-other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+other_conn_update(struct conntrack *ct, struct conn *conn_,
                   struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
 {
     struct conn_other *conn = conn_other_cast(conn_);
@@ -54,7 +54,7 @@  other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
         conn->state = OTHERS_MULTIPLE;
     }
 
-    conn_update_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
+    conn_update_expiration(ct, &conn->up, other_timeouts[conn->state], now);
 
     return CT_UPDATE_VALID;
 }
@@ -66,7 +66,7 @@  other_valid_new(struct dp_packet *pkt OVS_UNUSED)
 }
 
 static struct conn *
-other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
+other_new_conn(struct conntrack *ct, struct dp_packet *pkt OVS_UNUSED,
                long long now)
 {
     struct conn_other *conn;
@@ -74,7 +74,7 @@  other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
     conn = xzalloc(sizeof *conn);
     conn->state = OTHERS_FIRST;
 
-    conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
+    conn_init_expiration(ct, &conn->up, other_timeouts[conn->state], now);
 
     return &conn->up;
 }
diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index 059af9e..51b7d7f 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -1,5 +1,5 @@ 
 /*
- * Copyright (c) 2015, 2016, 2017 Nicira, Inc.
+ * Copyright (c) 2015-2019 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +21,10 @@ 
 #include <netinet/in.h>
 #include <netinet/ip6.h>
 
+#include "cmap.h"
 #include "conntrack.h"
 #include "ct-dpif.h"
+#include "ipf.h"
 #include "openvswitch/hmap.h"
 #include "openvswitch/list.h"
 #include "openvswitch/types.h"
@@ -57,12 +59,6 @@  struct conn_key {
     uint8_t nw_proto;
 };
 
-struct nat_conn_key_node {
-    struct hmap_node node;
-    struct conn_key key;
-    struct conn_key value;
-};
-
 /* This is used for alg expectations; an expectation is a
  * context created in preparation for establishing a data
  * connection. The expectation is created by the control
@@ -87,25 +83,34 @@  struct alg_exp_node {
     bool nat_rpl_dst;
 };
 
+enum OVS_PACKED_ENUM ct_conn_type {
+    CT_CONN_TYPE_DEFAULT,
+    CT_CONN_TYPE_UN_NAT,
+};
+
 struct conn {
+    /* Immutable data. */
     struct conn_key key;
     struct conn_key rev_key;
-    /* Only used for orig_tuple support. */
-    struct conn_key master_key;
-    long long expiration;
+    struct conn_key master_key; /* Only used for orig_tuple support. */
     struct ovs_list exp_node;
-    struct hmap_node node;
-    ovs_u128 label;
-    /* XXX: consider flattening. */
+    struct cmap_node cm_node;
     struct nat_action_info_t *nat_info;
     char *alg;
-    int seq_skew;
+    struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
+
+    /* Mutable data. */
+    struct ovs_mutex lock; /* Guards all mutable fields. */
+    ovs_u128 label;
     uint32_t mark;
-    uint8_t conn_type;
-    /* TCP sequence skew due to NATTing of FTP control messages. */
-    uint8_t seq_skew_dir;
-    /* True if alg data connection. */
-    uint8_t alg_related;
+    long long expiration;
+    int seq_skew;
+    bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
+                        * control messages; true if reply direction. */
+
+    /* Immutable data. */
+    bool alg_related; /* True if alg data connection. */
+    enum ct_conn_type conn_type;
 };
 
 enum ct_update_res {
@@ -114,78 +119,6 @@  enum ct_update_res {
     CT_UPDATE_NEW,
 };
 
-enum ct_conn_type {
-    CT_CONN_TYPE_DEFAULT,
-    CT_CONN_TYPE_UN_NAT,
-};
-
-/* 'struct ct_lock' is a wrapper for an adaptive mutex.  It's useful to try
- * different types of locks (e.g. spinlocks) */
-
-struct OVS_LOCKABLE ct_lock {
-    struct ovs_mutex lock;
-};
-
-static inline void ct_lock_init(struct ct_lock *lock)
-{
-    ovs_mutex_init_adaptive(&lock->lock);
-}
-
-static inline void ct_lock_lock(struct ct_lock *lock)
-    OVS_ACQUIRES(lock)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    ovs_mutex_lock(&lock->lock);
-}
-
-static inline void ct_lock_unlock(struct ct_lock *lock)
-    OVS_RELEASES(lock)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    ovs_mutex_unlock(&lock->lock);
-}
-
-static inline void ct_lock_destroy(struct ct_lock *lock)
-{
-    ovs_mutex_destroy(&lock->lock);
-}
-
-struct OVS_LOCKABLE ct_rwlock {
-    struct ovs_rwlock lock;
-};
-
-static inline void ct_rwlock_init(struct ct_rwlock *lock)
-{
-    ovs_rwlock_init(&lock->lock);
-}
-
-
-static inline void ct_rwlock_wrlock(struct ct_rwlock *lock)
-    OVS_ACQ_WRLOCK(lock)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    ovs_rwlock_wrlock(&lock->lock);
-}
-
-static inline void ct_rwlock_rdlock(struct ct_rwlock *lock)
-    OVS_ACQ_RDLOCK(lock)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    ovs_rwlock_rdlock(&lock->lock);
-}
-
-static inline void ct_rwlock_unlock(struct ct_rwlock *lock)
-    OVS_RELEASES(lock)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    ovs_rwlock_unlock(&lock->lock);
-}
-
-static inline void ct_rwlock_destroy(struct ct_rwlock *lock)
-{
-    ovs_rwlock_destroy(&lock->lock);
-}
-
 /* Timeouts: all the possible timeout states passed to update_expiration()
  * are listed here. The name will be prefix by CT_TM_ and the value is in
  * milliseconds */
@@ -217,115 +150,81 @@  enum ct_timeout {
     N_CT_TM
 };
 
-
-/* Locking:
- *
- * The connections are kept in different buckets, which are completely
- * independent. The connection bucket is determined by the hash of its key.
- *
- * Each bucket has two locks. Acquisition order is, from outermost to
- * innermost:
- *
- *    cleanup_mutex
- *    lock
- *
- * */
-struct conntrack_bucket {
-    /* Protects 'connections' and 'exp_lists'.  Used in the fast path */
-    struct ct_lock lock;
-    /* Contains the connections in the bucket, indexed by 'struct conn_key' */
-    struct hmap connections OVS_GUARDED;
-    /* For each possible timeout we have a list of connections. When the
-     * timeout of a connection is updated, we move it to the back of the list.
-     * Since the connection in a list have the same relative timeout, the list
-     * will be ordered, with the oldest connections to the front. */
-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
-
-    /* Protects 'next_cleanup'. Used to make sure that there's only one thread
-     * performing the cleanup. */
-    struct ovs_mutex cleanup_mutex;
-    long long next_cleanup OVS_GUARDED;
-};
-
-#define CONNTRACK_BUCKETS_SHIFT 8
-#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
-
 struct conntrack {
-    /* Independent buckets containing the connections */
-    struct conntrack_bucket buckets[CONNTRACK_BUCKETS];
-
-    /* Salt for hashing a connection key. */
-    uint32_t hash_basis;
-    /* The thread performing periodic cleanup of the connection
-     * tracker. */
-    pthread_t clean_thread;
-    /* Latch to destroy the 'clean_thread' */
-    struct latch clean_thread_exit;
-
-    /* Number of connections currently in the connection tracker. */
-    atomic_count n_conn;
-    /* Connections limit. When this limit is reached, no new connection
-     * will be accepted. */
-    atomic_uint n_conn_limit;
-
-    /* The following resources are referenced during nat connection
-     * creation and deletion. */
-    struct hmap nat_conn_keys OVS_GUARDED;
-    /* Hash table for alg expectations. Expectations are created
-     * by control connections to help create data connections. */
-    struct hmap alg_expectations OVS_GUARDED;
-    /* Used to lookup alg expectations from the control context. */
-    struct hindex alg_expectation_refs OVS_GUARDED;
-    /* Expiry list for alg expectations. */
-    struct ovs_list alg_exp_list OVS_GUARDED;
-    /* This lock is used during NAT connection creation and deletion;
-     * it is taken after a bucket lock and given back before that
-     * bucket unlock.
-     * This lock is similarly used to guard alg_expectations and
-     * alg_expectation_refs. If a bucket lock is also held during
-     * the normal code flow, then is must be taken first and released
-     * last.
-     */
-    struct ct_rwlock resources_lock;
+    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
+    struct cmap conns OVS_GUARDED;
+    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+    uint32_t hash_basis; /* Salt for hashing a connection key. */
+    pthread_t clean_thread; /* Periodically cleans up connection tracker. */
+    struct latch clean_thread_exit; /* To destroy the 'clean_thread'. */
+
+    /* Counting connections. */
+    atomic_count n_conn; /* Number of connections currently tracked. */
+    atomic_uint n_conn_limit; /* Max connections tracked. */
+
+    /* Expectations for application level gateways (created by control
+     * connections to help create data connections, e.g. for FTP). */
+    struct ovs_rwlock resources_lock; /* Protects fields below. */
+    struct hmap alg_expectations OVS_GUARDED; /* Holds struct
+                                               * alg_exp_nodes. */
+    struct hindex alg_expectation_refs OVS_GUARDED; /* For lookup from
+                                                     * control context.  */
 
     /* Fragmentation handling context. */
     struct ipf *ipf;
-
 };
 
+/* Lock acquisition order:
+ *    1. 'ct_lock'
+ *    2. 'conn->lock'
+ *    3. 'resources_lock'
+ */
+
+extern struct ct_l4_proto ct_proto_tcp;
+extern struct ct_l4_proto ct_proto_other;
+extern struct ct_l4_proto ct_proto_icmp4;
+extern struct ct_l4_proto ct_proto_icmp6;
+
 struct ct_l4_proto {
-    struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt,
+    struct conn *(*new_conn)(struct conntrack *ct, struct dp_packet *pkt,
                              long long now);
     bool (*valid_new)(struct dp_packet *pkt);
-    enum ct_update_res (*conn_update)(struct conn *conn,
-                                      struct conntrack_bucket *,
+    enum ct_update_res (*conn_update)(struct conntrack *ct, struct conn *conn,
                                       struct dp_packet *pkt, bool reply,
                                       long long now);
     void (*conn_get_protoinfo)(const struct conn *,
                                struct ct_dpif_protoinfo *);
 };
 
-extern struct ct_l4_proto ct_proto_tcp;
-extern struct ct_l4_proto ct_proto_other;
-extern struct ct_l4_proto ct_proto_icmp4;
-extern struct ct_l4_proto ct_proto_icmp6;
-
 extern long long ct_timeout_val[];
 
+
+/* ct_lock must be held. */
 static inline void
-conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn,
-                        enum ct_timeout tm, long long now)
+conn_init_expiration(struct conntrack *ct, struct conn *conn,
+                     enum ct_timeout tm, long long now)
 {
     conn->expiration = now + ct_timeout_val[tm];
-    ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node);
+    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
 }
 
+/* The conn entry lock must be held on entry and exit. */
 static inline void
-conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn,
+conn_update_expiration(struct conntrack *ct, struct conn *conn,
                        enum ct_timeout tm, long long now)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
+    ovs_mutex_unlock(&conn->lock);
+
+    ovs_mutex_lock(&ct->ct_lock);
+    ovs_mutex_lock(&conn->lock);
+    conn->expiration = now + ct_timeout_val[tm];
     ovs_list_remove(&conn->exp_node);
-    conn_init_expiration(ctb, conn, tm, now);
+    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
+    ovs_mutex_unlock(&conn->lock);
+    ovs_mutex_unlock(&ct->ct_lock);
+
+    ovs_mutex_lock(&conn->lock);
 }
 
 static inline uint32_t
diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c
index 86d313d..397aca1 100644
--- a/lib/conntrack-tcp.c
+++ b/lib/conntrack-tcp.c
@@ -44,16 +44,16 @@ 
 #include "util.h"
 
 struct tcp_peer {
-    enum ct_dpif_tcp_state state;
     uint32_t               seqlo;          /* Max sequence number sent     */
     uint32_t               seqhi;          /* Max the other end ACKd + win */
     uint16_t               max_win;        /* largest window (pre scaling) */
     uint8_t                wscale;         /* window scaling factor        */
+    enum ct_dpif_tcp_state state;
 };
 
 struct conn_tcp {
     struct conn up;
-    struct tcp_peer peer[2];
+    struct tcp_peer peer[2]; /* 'conn' lock protected. */
 };
 
 enum {
@@ -145,7 +145,7 @@  tcp_get_wscale(const struct tcp_header *tcp)
 }
 
 static enum ct_update_res
-tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+tcp_conn_update(struct conntrack *ct, struct conn *conn_,
                 struct dp_packet *pkt, bool reply, long long now)
 {
     struct conn_tcp *conn = conn_tcp_cast(conn_);
@@ -317,18 +317,18 @@  tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
 
         if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
             && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
-            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
+            conn_update_expiration(ct, &conn->up, CT_TM_TCP_CLOSED, now);
         } else if (src->state >= CT_DPIF_TCPS_CLOSING
                    && dst->state >= CT_DPIF_TCPS_CLOSING) {
-            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
+            conn_update_expiration(ct, &conn->up, CT_TM_TCP_FIN_WAIT, now);
         } else if (src->state < CT_DPIF_TCPS_ESTABLISHED
                    || dst->state < CT_DPIF_TCPS_ESTABLISHED) {
-            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
+            conn_update_expiration(ct, &conn->up, CT_TM_TCP_OPENING, now);
         } else if (src->state >= CT_DPIF_TCPS_CLOSING
                    || dst->state >= CT_DPIF_TCPS_CLOSING) {
-            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
+            conn_update_expiration(ct, &conn->up, CT_TM_TCP_CLOSING, now);
         } else {
-            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
+            conn_update_expiration(ct, &conn->up, CT_TM_TCP_ESTABLISHED, now);
         }
     } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
                 || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
@@ -412,8 +412,7 @@  tcp_valid_new(struct dp_packet *pkt)
 }
 
 static struct conn *
-tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
-             long long now)
+tcp_new_conn(struct conntrack *ct, struct dp_packet *pkt, long long now)
 {
     struct conn_tcp* newconn = NULL;
     struct tcp_header *tcp = dp_packet_l4(pkt);
@@ -449,8 +448,7 @@  tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
     src->state = CT_DPIF_TCPS_SYN_SENT;
     dst->state = CT_DPIF_TCPS_CLOSED;
 
-    conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
-                         now);
+    conn_init_expiration(ct, &newconn->up, CT_TM_TCP_FIRST_PACKET, now);
 
     return &newconn->up;
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index 49f6325..6711f5e 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -30,7 +30,6 @@ 
 #include "ct-dpif.h"
 #include "dp-packet.h"
 #include "flow.h"
-#include "ipf.h"
 #include "netdev.h"
 #include "odp-netlink.h"
 #include "openvswitch/hmap.h"
@@ -82,16 +81,15 @@  static bool conn_key_extract(struct conntrack *, struct dp_packet *,
                              uint16_t zone);
 static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
 static void conn_key_reverse(struct conn_key *);
-static void conn_key_lookup(struct conntrack_bucket *ctb,
-                            struct conn_lookup_ctx *ctx,
-                            long long now);
 static bool valid_new(struct dp_packet *pkt, struct conn_key *);
-static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
+static struct conn *new_conn(struct conntrack *ct, struct dp_packet *pkt,
                              struct conn_key *, long long now);
+static void delete_conn_cmn(struct conn *);
 static void delete_conn(struct conn *);
-static enum ct_update_res conn_update(struct conn *,
-                                      struct conntrack_bucket *ctb,
-                                      struct dp_packet *, bool reply,
+static void delete_conn_one(struct conn *conn);
+static enum ct_update_res conn_update(struct conntrack *ct, struct conn *conn,
+                                      struct dp_packet *pkt,
+                                      struct conn_lookup_ctx *ctx,
                                       long long now);
 static bool conn_expired(struct conn *, long long now);
 static void set_mark(struct dp_packet *, struct conn *,
@@ -101,21 +99,6 @@  static void set_label(struct dp_packet *, struct conn *,
                       const struct ovs_key_ct_labels *mask);
 static void *clean_thread_main(void *f_);
 
-static struct nat_conn_key_node *
-nat_conn_keys_lookup(struct hmap *nat_conn_keys,
-                     const struct conn_key *key,
-                     uint32_t basis);
-
-static bool
-nat_conn_keys_insert(struct hmap *nat_conn_keys,
-                     const struct conn *nat_conn,
-                     uint32_t hash_basis);
-
-static void
-nat_conn_keys_remove(struct hmap *nat_conn_keys,
-                     const struct conn_key *key,
-                     uint32_t basis);
-
 static bool
 nat_select_range_tuple(struct conntrack *ct, const struct conn *conn,
                        struct conn *nat_conn);
@@ -153,8 +136,7 @@  detect_ftp_ctl_type(const struct conn_lookup_ctx *ctx,
                     struct dp_packet *pkt);
 
 static void
-expectation_clean(struct conntrack *ct, const struct conn_key *master_key,
-                  uint32_t basis);
+expectation_clean(struct conntrack *ct, const struct conn_key *master_key);
 
 static struct ct_l4_proto *l4_protos[] = {
     [IPPROTO_TCP] = &ct_proto_tcp,
@@ -165,22 +147,20 @@  static struct ct_l4_proto *l4_protos[] = {
 
 static void
 handle_ftp_ctl(struct conntrack *ct, const struct conn_lookup_ctx *ctx,
-               struct dp_packet *pkt,
-               const struct conn *conn_for_expectation,
-               long long now, enum ftp_ctl_pkt ftp_ctl, bool nat);
+               struct dp_packet *pkt, struct conn *ec, long long now,
+               enum ftp_ctl_pkt ftp_ctl, bool nat);
 
 static void
 handle_tftp_ctl(struct conntrack *ct,
                 const struct conn_lookup_ctx *ctx OVS_UNUSED,
-                struct dp_packet *pkt,
-                const struct conn *conn_for_expectation,
-                long long now OVS_UNUSED,
-                enum ftp_ctl_pkt ftp_ctl OVS_UNUSED, bool nat OVS_UNUSED);
+                struct dp_packet *pkt, struct conn *conn_for_expectation,
+                long long now OVS_UNUSED, enum ftp_ctl_pkt ftp_ctl OVS_UNUSED,
+                bool nat OVS_UNUSED);
 
 typedef void (*alg_helper)(struct conntrack *ct,
                            const struct conn_lookup_ctx *ctx,
                            struct dp_packet *pkt,
-                           const struct conn *conn_for_expectation,
+                           struct conn *conn_for_expectation,
                            long long now, enum ftp_ctl_pkt ftp_ctl,
                            bool nat);
 
@@ -311,33 +291,22 @@  ct_print_conn_info(const struct conn *c, const char *log_msg,
 struct conntrack *
 conntrack_init(void)
 {
-    long long now = time_msec();
-
     struct conntrack *ct = xzalloc(sizeof *ct);
 
-    ct_rwlock_init(&ct->resources_lock);
-    ct_rwlock_wrlock(&ct->resources_lock);
-    hmap_init(&ct->nat_conn_keys);
+    ovs_rwlock_init(&ct->resources_lock);
+    ovs_rwlock_wrlock(&ct->resources_lock);
     hmap_init(&ct->alg_expectations);
     hindex_init(&ct->alg_expectation_refs);
-    ovs_list_init(&ct->alg_exp_list);
-    ct_rwlock_unlock(&ct->resources_lock);
-
-    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
-        struct conntrack_bucket *ctb = &ct->buckets[i];
+    ovs_rwlock_unlock(&ct->resources_lock);
 
-        ct_lock_init(&ctb->lock);
-        ct_lock_lock(&ctb->lock);
-        hmap_init(&ctb->connections);
-        for (unsigned j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
-            ovs_list_init(&ctb->exp_lists[j]);
-        }
-        ct_lock_unlock(&ctb->lock);
-        ovs_mutex_init(&ctb->cleanup_mutex);
-        ovs_mutex_lock(&ctb->cleanup_mutex);
-        ctb->next_cleanup = now + CT_TM_MIN;
-        ovs_mutex_unlock(&ctb->cleanup_mutex);
+    ovs_mutex_init_adaptive(&ct->ct_lock);
+    ovs_mutex_lock(&ct->ct_lock);
+    cmap_init(&ct->conns);
+    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
+        ovs_list_init(&ct->exp_lists[i]);
     }
+    ovs_mutex_unlock(&ct->ct_lock);
+
     ct->hash_basis = random_uint32();
     atomic_count_init(&ct->n_conn, 0);
     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
@@ -348,59 +317,113 @@  conntrack_init(void)
     return ct;
 }
 
-/* Destroys the connection tracker 'ct' and frees all the allocated memory. */
+static void
+conn_clean_cmn(struct conntrack *ct, struct conn *conn)
+    OVS_REQUIRES(ct->ct_lock)
+{
+    if (conn->alg) {
+        expectation_clean(ct, &conn->key);
+    }
+
+    uint32_t hash = conn_key_hash(&conn->key, ct->hash_basis);
+    cmap_remove(&ct->conns, &conn->cm_node, hash);
+}
+
+/* Must be called with 'conn' of 'conn_type' CT_CONN_TYPE_DEFAULT.  Also
+ * removes the associated nat 'conn' from the lookup datastructures. */
+static void
+conn_clean(struct conntrack *ct, struct conn *conn)
+    OVS_REQUIRES(ct->ct_lock)
+{
+    ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
+
+    conn_clean_cmn(ct, conn);
+    if (conn->nat_conn) {
+        uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
+        cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
+    }
+    ovs_list_remove(&conn->exp_node);
+    ovsrcu_postpone(delete_conn, conn);
+    atomic_count_dec(&ct->n_conn);
+}
+
+static void
+conn_clean_one(struct conntrack *ct, struct conn *conn)
+    OVS_REQUIRES(ct->ct_lock)
+{
+    conn_clean_cmn(ct, conn);
+    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
+        ovs_list_remove(&conn->exp_node);
+        atomic_count_dec(&ct->n_conn);
+    }
+    ovsrcu_postpone(delete_conn_one, conn);
+}
+
+/* Destroys the connection tracker 'ct' and frees all the allocated memory.
+ * The caller of this function must already have shut down packet input
+ * and PMD threads (which would have been quiesced).  */
 void
 conntrack_destroy(struct conntrack *ct)
 {
+    struct conn *conn;
     latch_set(&ct->clean_thread_exit);
     pthread_join(ct->clean_thread, NULL);
     latch_destroy(&ct->clean_thread_exit);
-    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
-        struct conntrack_bucket *ctb = &ct->buckets[i];
-        struct conn *conn;
 
-        ovs_mutex_lock(&ctb->cleanup_mutex);
-        ct_lock_lock(&ctb->lock);
-        HMAP_FOR_EACH_POP (conn, node, &ctb->connections) {
-            if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-                atomic_count_dec(&ct->n_conn);
-            }
-            delete_conn(conn);
-        }
-        hmap_destroy(&ctb->connections);
-        ct_lock_unlock(&ctb->lock);
-        ovs_mutex_unlock(&ctb->cleanup_mutex);
-        ct_lock_destroy(&ctb->lock);
-        ovs_mutex_destroy(&ctb->cleanup_mutex);
+    ovs_mutex_lock(&ct->ct_lock);
+    CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
+        conn_clean_one(ct, conn);
     }
-    ct_rwlock_wrlock(&ct->resources_lock);
-    struct nat_conn_key_node *nat_conn_key_node;
-    HMAP_FOR_EACH_POP (nat_conn_key_node, node, &ct->nat_conn_keys) {
-        free(nat_conn_key_node);
-    }
-    hmap_destroy(&ct->nat_conn_keys);
+    cmap_destroy(&ct->conns);
+    ovs_mutex_unlock(&ct->ct_lock);
+    ovs_mutex_destroy(&ct->ct_lock);
 
+    ovs_rwlock_wrlock(&ct->resources_lock);
     struct alg_exp_node *alg_exp_node;
     HMAP_FOR_EACH_POP (alg_exp_node, node, &ct->alg_expectations) {
         free(alg_exp_node);
     }
-
-    ovs_list_poison(&ct->alg_exp_list);
     hmap_destroy(&ct->alg_expectations);
     hindex_destroy(&ct->alg_expectation_refs);
-    ct_rwlock_unlock(&ct->resources_lock);
-    ct_rwlock_destroy(&ct->resources_lock);
+    ovs_rwlock_unlock(&ct->resources_lock);
+    ovs_rwlock_destroy(&ct->resources_lock);
+
     ipf_destroy(ct->ipf);
     free(ct);
 }
 
-static unsigned hash_to_bucket(uint32_t hash)
+
+static bool
+conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
+                uint32_t hash, long long now, struct conn **conn_out,
+                bool *reply)
 {
-    /* Extracts the most significant bits in hash. The least significant bits
-     * are already used internally by the hmap implementation. */
-    BUILD_ASSERT(CONNTRACK_BUCKETS_SHIFT < 32 && CONNTRACK_BUCKETS_SHIFT >= 1);
+    struct conn *conn;
+    bool found = false;
+
+    CMAP_FOR_EACH_WITH_HASH (conn, cm_node, hash, &ct->conns) {
+        if (!conn_key_cmp(&conn->key, key) && !conn_expired(conn, now)) {
+            found = true;
+            if (reply) {
+                *reply = false;
+            }
+            break;
+        }
+        if (!conn_key_cmp(&conn->rev_key, key) && !conn_expired(conn, now)) {
+            found = true;
+            if (reply) {
+                *reply = true;
+            }
+            break;
+        }
+    }
 
-    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
+    if (found && conn_out) {
+        *conn_out = conn;
+    } else if (conn_out) {
+        *conn_out = NULL;
+    }
+    return found;
 }
 
 static void
@@ -409,8 +432,16 @@  write_ct_md(struct dp_packet *pkt, uint16_t zone, const struct conn *conn,
 {
     pkt->md.ct_state |= CS_TRACKED;
     pkt->md.ct_zone = zone;
-    pkt->md.ct_mark = conn ? conn->mark : 0;
-    pkt->md.ct_label = conn ? conn->label : OVS_U128_ZERO;
+
+    if (conn) {
+        ovs_mutex_lock(&conn->lock);
+        pkt->md.ct_mark = conn->mark;
+        pkt->md.ct_label = conn->label;
+        ovs_mutex_unlock(&conn->lock);
+    } else {
+        pkt->md.ct_mark = 0;
+        pkt->md.ct_label = OVS_U128_ZERO;
+    }
 
     /* Use the original direction tuple if we have it. */
     if (conn) {
@@ -526,13 +557,14 @@  alg_src_ip_wc(enum ct_alg_ctl_type alg_ctl_type)
 static void
 handle_alg_ctl(struct conntrack *ct, const struct conn_lookup_ctx *ctx,
                struct dp_packet *pkt, enum ct_alg_ctl_type ct_alg_ctl,
-               const struct conn *conn, long long now, bool nat,
-               const struct conn *conn_for_expectation)
+               struct conn *conn, long long now, bool nat)
 {
     /* ALG control packet handling with expectation creation. */
     if (OVS_UNLIKELY(alg_helpers[ct_alg_ctl] && conn && conn->alg)) {
-        alg_helpers[ct_alg_ctl](ct, ctx, pkt, conn_for_expectation, now,
-                                CT_FTP_CTL_INTEREST, nat);
+        ovs_mutex_lock(&conn->lock);
+        alg_helpers[ct_alg_ctl](ct, ctx, pkt, conn, now, CT_FTP_CTL_INTEREST,
+                                nat);
+        ovs_mutex_unlock(&conn->lock);
     }
 }
 
@@ -744,144 +776,22 @@  un_nat_packet(struct dp_packet *pkt, const struct conn *conn,
     }
 }
 
-/* Typical usage of this helper is in non per-packet code;
- * this is because the bucket lock needs to be held for lookup
- * and a hash would have already been needed. Hence, this function
- * is just intended for code clarity. */
-static struct conn *
-conn_lookup(struct conntrack *ct, const struct conn_key *key, long long now)
-{
-    struct conn_lookup_ctx ctx;
-    ctx.conn = NULL;
-    memcpy(&ctx.key, key, sizeof ctx.key);
-    ctx.hash = conn_key_hash(key, ct->hash_basis);
-    unsigned bucket = hash_to_bucket(ctx.hash);
-    conn_key_lookup(&ct->buckets[bucket], &ctx, now);
-    return ctx.conn;
-}
-
-/* Only used when looking up 'CT_CONN_TYPE_DEFAULT' conns. */
-static struct conn *
-conn_lookup_def(const struct conn_key *key,
-                const struct conntrack_bucket *ctb, uint32_t hash)
-    OVS_REQUIRES(ctb->lock)
-{
-    struct conn *conn = NULL;
-
-    HMAP_FOR_EACH_WITH_HASH (conn, node, hash, &ctb->connections) {
-        if (!conn_key_cmp(&conn->key, key)
-            && conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-            break;
-        }
-        if (!conn_key_cmp(&conn->rev_key, key)
-            && conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-            break;
-        }
-    }
-    return conn;
-}
-
-static struct conn *
-conn_lookup_unnat(const struct conn_key *key,
-                  const struct conntrack_bucket *ctb, uint32_t hash)
-    OVS_REQUIRES(ctb->lock)
-{
-    struct conn *conn = NULL;
-
-    HMAP_FOR_EACH_WITH_HASH (conn, node, hash, &ctb->connections) {
-        if (!conn_key_cmp(&conn->key, key)
-            && conn->conn_type == CT_CONN_TYPE_UN_NAT) {
-            break;
-        }
-    }
-    return conn;
-}
-
 static void
-conn_seq_skew_set(struct conntrack *ct, const struct conn_key *key,
+conn_seq_skew_set(struct conntrack *ct, const struct conn *conn_in,
                   long long now, int seq_skew, bool seq_skew_dir)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    unsigned bucket = hash_to_bucket(conn_key_hash(key, ct->hash_basis));
-    ct_lock_lock(&ct->buckets[bucket].lock);
-    struct conn *conn = conn_lookup(ct, key, now);
+    struct conn *conn;
+    bool reply;
+    uint32_t hash = conn_key_hash(&conn_in->key, ct->hash_basis);
+    ovs_mutex_unlock(&conn_in->lock);
+    conn_key_lookup(ct, &conn_in->key, hash, now, &conn, &reply);
+    ovs_mutex_lock(&conn_in->lock);
+
     if (conn && seq_skew) {
         conn->seq_skew = seq_skew;
         conn->seq_skew_dir = seq_skew_dir;
     }
-    ct_lock_unlock(&ct->buckets[bucket].lock);
-}
-
-static void
-nat_clean(struct conntrack *ct, struct conn *conn,
-          struct conntrack_bucket *ctb)
-    OVS_REQUIRES(ctb->lock)
-{
-    ct_rwlock_wrlock(&ct->resources_lock);
-    nat_conn_keys_remove(&ct->nat_conn_keys, &conn->rev_key, ct->hash_basis);
-    ct_rwlock_unlock(&ct->resources_lock);
-    ct_lock_unlock(&ctb->lock);
-    uint32_t hash = conn_key_hash(&conn->rev_key, ct->hash_basis);
-    unsigned bucket_rev_conn = hash_to_bucket(hash);
-    ct_lock_lock(&ct->buckets[bucket_rev_conn].lock);
-    ct_rwlock_wrlock(&ct->resources_lock);
-    struct conn *rev_conn = conn_lookup_unnat(&conn->rev_key,
-                                              &ct->buckets[bucket_rev_conn],
-                                              hash);
-    struct nat_conn_key_node *nat_conn_key_node =
-        nat_conn_keys_lookup(&ct->nat_conn_keys, &conn->rev_key,
-                             ct->hash_basis);
-
-    /* In the unlikely event, rev conn was recreated, then skip
-     * rev_conn cleanup. */
-    if (rev_conn && (!nat_conn_key_node ||
-                     conn_key_cmp(&nat_conn_key_node->value,
-                                  &rev_conn->rev_key))) {
-        hmap_remove(&ct->buckets[bucket_rev_conn].connections,
-                    &rev_conn->node);
-        free(rev_conn);
-    }
-
-    delete_conn(conn);
-    ct_rwlock_unlock(&ct->resources_lock);
-    ct_lock_unlock(&ct->buckets[bucket_rev_conn].lock);
-    ct_lock_lock(&ctb->lock);
-}
-
-/* Must be called with 'CT_CONN_TYPE_DEFAULT' 'conn_type'. */
-static void
-conn_clean(struct conntrack *ct, struct conn *conn,
-           struct conntrack_bucket *ctb)
-    OVS_REQUIRES(ctb->lock)
-{
-    ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
-
-    if (conn->alg) {
-        expectation_clean(ct, &conn->key, ct->hash_basis);
-    }
-    ovs_list_remove(&conn->exp_node);
-    hmap_remove(&ctb->connections, &conn->node);
-    atomic_count_dec(&ct->n_conn);
-    if (conn->nat_info) {
-        nat_clean(ct, conn, ctb);
-    } else {
-        delete_conn(conn);
-    }
-}
-
-/* Only called for 'CT_CONN_TYPE_DEFAULT' conns; must be called with no
- * locks held and upon return no locks are held. */
-static void
-conn_clean_safe(struct conntrack *ct, struct conn *conn,
-                struct conntrack_bucket *ctb, uint32_t hash)
-{
-    ovs_mutex_lock(&ctb->cleanup_mutex);
-    ct_lock_lock(&ctb->lock);
-    conn = conn_lookup_def(&conn->key, ctb, hash);
-    if (conn) {
-        conn_clean(ct, conn, ctb);
-    }
-    ct_lock_unlock(&ctb->lock);
-    ovs_mutex_unlock(&ctb->cleanup_mutex);
 }
 
 static bool
@@ -904,18 +814,16 @@  ct_verify_helper(const char *helper, enum ct_alg_ctl_type ct_alg_ctl)
     }
 }
 
-/* This function is called with the bucket lock held. */
 static struct conn *
 conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                struct conn_lookup_ctx *ctx, bool commit, long long now,
                const struct nat_action_info_t *nat_action_info,
-               struct conn *conn_for_un_nat_copy,
-               const char *helper,
-               const struct alg_exp_node *alg_exp,
+               const char *helper, const struct alg_exp_node *alg_exp,
                enum ct_alg_ctl_type ct_alg_ctl)
+    OVS_REQUIRES(ct->ct_lock)
 {
     struct conn *nc = NULL;
-    struct conn connl;
+    struct conn *nat_conn = NULL;
 
     if (!valid_new(pkt, &ctx->key)) {
         pkt->md.ct_state = CS_INVALID;
@@ -931,15 +839,12 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
     if (commit) {
         unsigned int n_conn_limit;
         atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
-
         if (atomic_count_get(&ct->n_conn) >= n_conn_limit) {
             COVERAGE_INC(conntrack_full);
             return nc;
         }
 
-        unsigned bucket = hash_to_bucket(ctx->hash);
-        nc = &connl;
-        memset(nc, 0, sizeof *nc);
+        nc = new_conn(ct, pkt, &ctx->key, now);
         memcpy(&nc->key, &ctx->key, sizeof nc->key);
         memcpy(&nc->rev_key, &nc->key, sizeof nc->rev_key);
         conn_key_reverse(&nc->rev_key);
@@ -957,6 +862,7 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
 
         if (nat_action_info) {
             nc->nat_info = xmemdup(nat_action_info, sizeof *nc->nat_info);
+            nat_conn = xzalloc(sizeof *nat_conn);
 
             if (alg_exp) {
                 if (alg_exp->nat_rpl_dst) {
@@ -966,65 +872,49 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                     nc->rev_key.src.addr = alg_exp->alg_nat_repl_addr;
                     nc->nat_info->nat_action = NAT_ACTION_DST;
                 }
-                memcpy(conn_for_un_nat_copy, nc, sizeof *conn_for_un_nat_copy);
-                ct_rwlock_wrlock(&ct->resources_lock);
-                bool new_insert = nat_conn_keys_insert(&ct->nat_conn_keys,
-                                                       conn_for_un_nat_copy,
-                                                       ct->hash_basis);
-                ct_rwlock_unlock(&ct->resources_lock);
-                if (!new_insert) {
-                    char *log_msg = xasprintf("Pre-existing alg "
-                                              "nat_conn_key");
-                    ct_print_conn_info(conn_for_un_nat_copy, log_msg, VLL_INFO,
-                                       true, false);
-                    free(log_msg);
-                }
             } else {
-                memcpy(conn_for_un_nat_copy, nc, sizeof *conn_for_un_nat_copy);
-                ct_rwlock_wrlock(&ct->resources_lock);
-                bool nat_res = nat_select_range_tuple(ct, nc,
-                                                      conn_for_un_nat_copy);
-                ct_rwlock_unlock(&ct->resources_lock);
+                memcpy(nat_conn, nc, sizeof *nat_conn);
+                bool nat_res = nat_select_range_tuple(ct, nc, nat_conn);
 
                 if (!nat_res) {
                     goto nat_res_exhaustion;
                 }
 
-                /* Update nc with nat adjustments made to
-                 * conn_for_un_nat_copy by nat_select_range_tuple(). */
-                memcpy(nc, conn_for_un_nat_copy, sizeof *nc);
+                /* Update nc with nat adjustments made to nat_conn by
+                 * nat_select_range_tuple(). */
+                memcpy(nc, nat_conn, sizeof *nc);
             }
-            conn_for_un_nat_copy->conn_type = CT_CONN_TYPE_UN_NAT;
-            conn_for_un_nat_copy->nat_info = NULL;
-            conn_for_un_nat_copy->alg = NULL;
+
             nat_packet(pkt, nc, ctx->icmp_related);
-        }
-        struct conn *nconn = new_conn(&ct->buckets[bucket], pkt, &ctx->key,
-                                      now);
-        memcpy(&nconn->key, &nc->key, sizeof nconn->key);
-        memcpy(&nconn->rev_key, &nc->rev_key, sizeof nconn->rev_key);
-        memcpy(&nconn->master_key, &nc->master_key, sizeof nconn->master_key);
-        nconn->alg_related = nc->alg_related;
-        nconn->alg = nc->alg;
-        nconn->mark = nc->mark;
-        nconn->label = nc->label;
-        nconn->nat_info = nc->nat_info;
-        ctx->conn = nc = nconn;
-        hmap_insert(&ct->buckets[bucket].connections, &nconn->node, ctx->hash);
+            memcpy(&nat_conn->key, &nc->rev_key, sizeof nat_conn->key);
+            memcpy(&nat_conn->rev_key, &nc->key, sizeof nat_conn->rev_key);
+            nat_conn->conn_type = CT_CONN_TYPE_UN_NAT;
+            nat_conn->nat_info = NULL;
+            nat_conn->alg = NULL;
+            nat_conn->nat_conn = NULL;
+            uint32_t nat_hash = conn_key_hash(&nat_conn->key, ct->hash_basis);
+            cmap_insert(&ct->conns, &nat_conn->cm_node, nat_hash);
+        }
+
+        nc->nat_conn = nat_conn;
+        ovs_mutex_init_adaptive(&nc->lock);
+        nc->conn_type = CT_CONN_TYPE_DEFAULT;
+        cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
         atomic_count_inc(&ct->n_conn);
+        ctx->conn = nc; /* For completeness. */
     }
 
     return nc;
 
-    /* This would be a user error or a DOS attack.
-     * A user error is prevented by allocating enough
-     * combinations of NAT addresses when combined with
-     * ephemeral ports.  A DOS attack should be protected
-     * against with firewall rules or a separate firewall.
-     * Also using zone partitioning can limit DoS impact. */
+    /* This would be a user error or a DOS attack.  A user error is prevented
+     * by allocating enough combinations of NAT addresses when combined with
+     * ephemeral ports.  A DOS attack should be protected against with
+     * firewall rules or a separate firewall.  Also using zone partitioning
+     * can limit DoS impact. */
 nat_res_exhaustion:
-    free(nc->alg);
-    free(nc->nat_info);
+    free(nat_conn);
+    ovs_list_remove(&nc->exp_node);
+    delete_conn_cmn(nc);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
     VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
                  "if DoS attack, use firewalling and/or zone partitioning.");
@@ -1033,12 +923,11 @@  nat_res_exhaustion:
 
 static bool
 conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
-                  struct conn_lookup_ctx *ctx, struct conn **conn,
-                  long long now, unsigned bucket)
-    OVS_REQUIRES(ct->buckets[bucket].lock)
+                  struct conn_lookup_ctx *ctx, struct conn *conn,
+                  long long now)
 {
+    ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
     bool create_new_conn = false;
-    struct conn lconn;
 
     if (ctx->icmp_related) {
         pkt->md.ct_state |= CS_RELATED;
@@ -1046,12 +935,11 @@  conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
             pkt->md.ct_state |= CS_REPLY_DIR;
         }
     } else {
-        if ((*conn)->alg_related) {
+        if (conn->alg_related) {
             pkt->md.ct_state |= CS_RELATED;
         }
 
-        enum ct_update_res res = conn_update(*conn, &ct->buckets[bucket],
-                                             pkt, ctx->reply, now);
+        enum ct_update_res res = conn_update(ct, conn, pkt, ctx, now);
 
         switch (res) {
         case CT_UPDATE_VALID:
@@ -1065,10 +953,9 @@  conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
             pkt->md.ct_state = CS_INVALID;
             break;
         case CT_UPDATE_NEW:
-            memcpy(&lconn, *conn, sizeof lconn);
-            ct_lock_unlock(&ct->buckets[bucket].lock);
-            conn_clean_safe(ct, &lconn, &ct->buckets[bucket], ctx->hash);
-            ct_lock_lock(&ct->buckets[bucket].lock);
+            ovs_mutex_lock(&ct->ct_lock);
+            conn_clean(ct, conn);
+            ovs_mutex_unlock(&ct->ct_lock);
             create_new_conn = true;
             break;
         default:
@@ -1079,51 +966,6 @@  conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
 }
 
 static void
-create_un_nat_conn(struct conntrack *ct, struct conn *conn_for_un_nat_copy,
-                   long long now, bool alg_un_nat)
-{
-    struct conn *nc = xmemdup(conn_for_un_nat_copy, sizeof *nc);
-    memcpy(&nc->key, &conn_for_un_nat_copy->rev_key, sizeof nc->key);
-    memcpy(&nc->rev_key, &conn_for_un_nat_copy->key, sizeof nc->rev_key);
-    uint32_t un_nat_hash = conn_key_hash(&nc->key, ct->hash_basis);
-    unsigned un_nat_conn_bucket = hash_to_bucket(un_nat_hash);
-    ct_lock_lock(&ct->buckets[un_nat_conn_bucket].lock);
-    struct conn *rev_conn = conn_lookup(ct, &nc->key, now);
-
-    if (alg_un_nat) {
-        if (!rev_conn) {
-            hmap_insert(&ct->buckets[un_nat_conn_bucket].connections,
-                        &nc->node, un_nat_hash);
-        } else {
-            char *log_msg = xasprintf("Unusual condition for un_nat conn "
-                                      "create for alg: rev_conn %p", rev_conn);
-            ct_print_conn_info(nc, log_msg, VLL_INFO, true, false);
-            free(log_msg);
-            free(nc);
-        }
-    } else {
-        ct_rwlock_rdlock(&ct->resources_lock);
-
-        struct nat_conn_key_node *nat_conn_key_node =
-            nat_conn_keys_lookup(&ct->nat_conn_keys, &nc->key, ct->hash_basis);
-        if (nat_conn_key_node && !conn_key_cmp(&nat_conn_key_node->value,
-            &nc->rev_key) && !rev_conn) {
-            hmap_insert(&ct->buckets[un_nat_conn_bucket].connections,
-                        &nc->node, un_nat_hash);
-        } else {
-            char *log_msg = xasprintf("Unusual condition for un_nat conn "
-                                      "create: nat_conn_key_node/rev_conn "
-                                      "%p/%p", nat_conn_key_node, rev_conn);
-            ct_print_conn_info(nc, log_msg, VLL_INFO, true, false);
-            free(log_msg);
-            free(nc);
-        }
-        ct_rwlock_unlock(&ct->resources_lock);
-    }
-    ct_lock_unlock(&ct->buckets[un_nat_conn_bucket].lock);
-}
-
-static void
 handle_nat(struct dp_packet *pkt, struct conn *conn,
            uint16_t zone, bool reply, bool related)
 {
@@ -1146,9 +988,8 @@  handle_nat(struct dp_packet *pkt, struct conn *conn,
 static bool
 check_orig_tuple(struct conntrack *ct, struct dp_packet *pkt,
                  struct conn_lookup_ctx *ctx_in, long long now,
-                 unsigned *bucket, struct conn **conn,
+                 struct conn **conn,
                  const struct nat_action_info_t *nat_action_info)
-    OVS_REQUIRES(ct->buckets[*bucket].lock)
 {
     if ((ctx_in->key.dl_type == htons(ETH_TYPE_IP) &&
          !pkt->md.ct_orig_tuple.ipv4.ipv4_proto) ||
@@ -1159,83 +1000,76 @@  check_orig_tuple(struct conntrack *ct, struct dp_packet *pkt,
         return false;
     }
 
-    ct_lock_unlock(&ct->buckets[*bucket].lock);
-    struct conn_lookup_ctx ctx;
-    memset(&ctx, 0 , sizeof ctx);
-    ctx.conn = NULL;
+    struct conn_key key;
+    memset(&key, 0 , sizeof key);
 
     if (ctx_in->key.dl_type == htons(ETH_TYPE_IP)) {
-        ctx.key.src.addr.ipv4 = pkt->md.ct_orig_tuple.ipv4.ipv4_src;
-        ctx.key.dst.addr.ipv4 = pkt->md.ct_orig_tuple.ipv4.ipv4_dst;
+        key.src.addr.ipv4 = pkt->md.ct_orig_tuple.ipv4.ipv4_src;
+        key.dst.addr.ipv4 = pkt->md.ct_orig_tuple.ipv4.ipv4_dst;
 
         if (ctx_in->key.nw_proto == IPPROTO_ICMP) {
-            ctx.key.src.icmp_id = ctx_in->key.src.icmp_id;
-            ctx.key.dst.icmp_id = ctx_in->key.dst.icmp_id;
+            key.src.icmp_id = ctx_in->key.src.icmp_id;
+            key.dst.icmp_id = ctx_in->key.dst.icmp_id;
             uint16_t src_port = ntohs(pkt->md.ct_orig_tuple.ipv4.src_port);
-            ctx.key.src.icmp_type = (uint8_t) src_port;
-            ctx.key.dst.icmp_type = reverse_icmp_type(ctx.key.src.icmp_type);
+            key.src.icmp_type = (uint8_t) src_port;
+            key.dst.icmp_type = reverse_icmp_type(key.src.icmp_type);
         } else {
-            ctx.key.src.port = pkt->md.ct_orig_tuple.ipv4.src_port;
-            ctx.key.dst.port = pkt->md.ct_orig_tuple.ipv4.dst_port;
+            key.src.port = pkt->md.ct_orig_tuple.ipv4.src_port;
+            key.dst.port = pkt->md.ct_orig_tuple.ipv4.dst_port;
         }
-        ctx.key.nw_proto = pkt->md.ct_orig_tuple.ipv4.ipv4_proto;
+        key.nw_proto = pkt->md.ct_orig_tuple.ipv4.ipv4_proto;
     } else {
-        ctx.key.src.addr.ipv6 = pkt->md.ct_orig_tuple.ipv6.ipv6_src;
-        ctx.key.dst.addr.ipv6 = pkt->md.ct_orig_tuple.ipv6.ipv6_dst;
+        key.src.addr.ipv6 = pkt->md.ct_orig_tuple.ipv6.ipv6_src;
+        key.dst.addr.ipv6 = pkt->md.ct_orig_tuple.ipv6.ipv6_dst;
 
         if (ctx_in->key.nw_proto == IPPROTO_ICMPV6) {
-            ctx.key.src.icmp_id = ctx_in->key.src.icmp_id;
-            ctx.key.dst.icmp_id = ctx_in->key.dst.icmp_id;
+            key.src.icmp_id = ctx_in->key.src.icmp_id;
+            key.dst.icmp_id = ctx_in->key.dst.icmp_id;
             uint16_t src_port = ntohs(pkt->md.ct_orig_tuple.ipv6.src_port);
-            ctx.key.src.icmp_type = (uint8_t) src_port;
-            ctx.key.dst.icmp_type = reverse_icmp6_type(ctx.key.src.icmp_type);
+            key.src.icmp_type = (uint8_t) src_port;
+            key.dst.icmp_type = reverse_icmp6_type(key.src.icmp_type);
         } else {
-            ctx.key.src.port = pkt->md.ct_orig_tuple.ipv6.src_port;
-            ctx.key.dst.port = pkt->md.ct_orig_tuple.ipv6.dst_port;
+            key.src.port = pkt->md.ct_orig_tuple.ipv6.src_port;
+            key.dst.port = pkt->md.ct_orig_tuple.ipv6.dst_port;
         }
-        ctx.key.nw_proto = pkt->md.ct_orig_tuple.ipv6.ipv6_proto;
+        key.nw_proto = pkt->md.ct_orig_tuple.ipv6.ipv6_proto;
     }
 
-    ctx.key.dl_type = ctx_in->key.dl_type;
-    ctx.key.zone = pkt->md.ct_zone;
-    ctx.hash = conn_key_hash(&ctx.key, ct->hash_basis);
-    *bucket = hash_to_bucket(ctx.hash);
-    ct_lock_lock(&ct->buckets[*bucket].lock);
-    conn_key_lookup(&ct->buckets[*bucket], &ctx, now);
-    *conn = ctx.conn;
+    key.dl_type = ctx_in->key.dl_type;
+    key.zone = pkt->md.ct_zone;
+    uint32_t hash = conn_key_hash(&key, ct->hash_basis);
+    bool reply;
+    conn_key_lookup(ct, &key, hash, now, conn, &reply);
     return *conn ? true : false;
 }
 
 static bool
-is_un_nat_conn_valid(const struct conn *un_nat_conn)
-{
-    return un_nat_conn->conn_type == CT_CONN_TYPE_UN_NAT;
-}
-
-static bool
 conn_update_state_alg(struct conntrack *ct, struct dp_packet *pkt,
                       struct conn_lookup_ctx *ctx, struct conn *conn,
                       const struct nat_action_info_t *nat_action_info,
                       enum ct_alg_ctl_type ct_alg_ctl, long long now,
-                      unsigned bucket, bool *create_new_conn)
-    OVS_REQUIRES(ct->buckets[bucket].lock)
+                      bool *create_new_conn)
 {
     if (is_ftp_ctl(ct_alg_ctl)) {
         /* Keep sequence tracking in sync with the source of the
          * sequence skew. */
+        ovs_mutex_lock(&conn->lock);
         if (ctx->reply != conn->seq_skew_dir) {
             handle_ftp_ctl(ct, ctx, pkt, conn, now, CT_FTP_CTL_OTHER,
                            !!nat_action_info);
-            *create_new_conn = conn_update_state(ct, pkt, ctx, &conn, now,
-                                                bucket);
+            /* conn_update_state locks for unrelated fields, so unlock. */
+            ovs_mutex_unlock(&conn->lock);
+            *create_new_conn = conn_update_state(ct, pkt, ctx, conn, now);
         } else {
-            *create_new_conn = conn_update_state(ct, pkt, ctx, &conn, now,
-                                                bucket);
-
+            /* conn_update_state locks for unrelated fields, so unlock. */
+            ovs_mutex_unlock(&conn->lock);
+            *create_new_conn = conn_update_state(ct, pkt, ctx, conn, now);
+            ovs_mutex_lock(&conn->lock);
             if (*create_new_conn == false) {
                 handle_ftp_ctl(ct, ctx, pkt, conn, now, CT_FTP_CTL_OTHER,
                                !!nat_action_info);
             }
+            ovs_mutex_unlock(&conn->lock);
         }
         return true;
     }
@@ -1250,19 +1084,15 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
             const struct nat_action_info_t *nat_action_info,
             ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper)
 {
-    struct conn *conn;
-    unsigned bucket = hash_to_bucket(ctx->hash);
-    ct_lock_lock(&ct->buckets[bucket].lock);
-    conn_key_lookup(&ct->buckets[bucket], ctx, now);
-    conn = ctx->conn;
+    bool create_new_conn = false;
+    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
+    struct conn *conn = ctx->conn;
 
     /* Delete found entry if in wrong direction. 'force' implies commit. */
     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
-        struct conn lconn;
-        memcpy(&lconn, conn, sizeof lconn);
-        ct_lock_unlock(&ct->buckets[bucket].lock);
-        conn_clean_safe(ct, &lconn, &ct->buckets[bucket], ctx->hash);
-        ct_lock_lock(&ct->buckets[bucket].lock);
+        ovs_mutex_lock(&ct->ct_lock);
+        conn_clean(ct, conn);
+        ovs_mutex_unlock(&ct->ct_lock);
         conn = NULL;
     }
 
@@ -1270,53 +1100,36 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
         if (conn->conn_type == CT_CONN_TYPE_UN_NAT) {
 
             ctx->reply = true;
+            struct conn *rev_conn = conn;  /* Save for debugging. */
+            uint32_t hash = conn_key_hash(&conn->rev_key, ct->hash_basis);
+            conn_key_lookup(ct, &ctx->key, hash, now, &conn, &ctx->reply);
 
-            struct conn_lookup_ctx ctx2;
-            ctx2.conn = NULL;
-            memcpy(&ctx2.key, &conn->rev_key, sizeof ctx2.key);
-            ctx2.hash = conn_key_hash(&conn->rev_key, ct->hash_basis);
-
-            ct_lock_unlock(&ct->buckets[bucket].lock);
-            bucket = hash_to_bucket(ctx2.hash);
-
-            ct_lock_lock(&ct->buckets[bucket].lock);
-            conn_key_lookup(&ct->buckets[bucket], &ctx2, now);
-
-            if (ctx2.conn) {
-                conn = ctx2.conn;
-            } else {
-                /* It is a race condition where conn has timed out and removed
-                 * between unlock of the rev_conn and lock of the forward conn;
-                 * nothing to do. */
+            if (!conn) {
                 pkt->md.ct_state |= CS_TRACKED | CS_INVALID;
-                ct_lock_unlock(&ct->buckets[bucket].lock);
+                char *log_msg = xasprintf("Missing master conn %p", rev_conn);
+                ct_print_conn_info(conn, log_msg, VLL_INFO, true, true);
+                free(log_msg);
                 return;
             }
         }
     }
 
-    bool create_new_conn = false;
-    struct conn conn_for_un_nat_copy;
-    conn_for_un_nat_copy.conn_type = CT_CONN_TYPE_DEFAULT;
-
     enum ct_alg_ctl_type ct_alg_ctl = get_alg_ctl_type(pkt, tp_src, tp_dst,
                                                        helper);
 
     if (OVS_LIKELY(conn)) {
         if (OVS_LIKELY(!conn_update_state_alg(ct, pkt, ctx, conn,
                                               nat_action_info,
-                                              ct_alg_ctl, now, bucket,
+                                              ct_alg_ctl, now,
                                               &create_new_conn))) {
-            create_new_conn = conn_update_state(ct, pkt, ctx, &conn, now,
-                                                bucket);
+            create_new_conn = conn_update_state(ct, pkt, ctx, conn, now);
         }
         if (nat_action_info && !create_new_conn) {
             handle_nat(pkt, conn, zone, ctx->reply, ctx->icmp_related);
         }
 
-    } else if (check_orig_tuple(ct, pkt, ctx, now, &bucket, &conn,
-                               nat_action_info)) {
-        create_new_conn = conn_update_state(ct, pkt, ctx, &conn, now, bucket);
+    } else if (check_orig_tuple(ct, pkt, ctx, now, &conn, nat_action_info)) {
+        create_new_conn = conn_update_state(ct, pkt, ctx, conn, now);
     } else {
         if (ctx->icmp_related) {
             /* An icmp related conn should always be found; no new
@@ -1332,7 +1145,7 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
 
     if (OVS_UNLIKELY(create_new_conn)) {
 
-        ct_rwlock_rdlock(&ct->resources_lock);
+        ovs_rwlock_rdlock(&ct->resources_lock);
         alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
                                      ct->hash_basis,
                                      alg_src_ip_wc(ct_alg_ctl));
@@ -1340,11 +1153,12 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
             memcpy(&alg_exp_entry, alg_exp, sizeof alg_exp_entry);
             alg_exp = &alg_exp_entry;
         }
-        ct_rwlock_unlock(&ct->resources_lock);
+        ovs_rwlock_unlock(&ct->resources_lock);
 
+        ovs_mutex_lock(&ct->ct_lock);
         conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
-                              &conn_for_un_nat_copy, helper, alg_exp,
-                              ct_alg_ctl);
+                              helper, alg_exp, ct_alg_ctl);
+        ovs_mutex_unlock(&ct->ct_lock);
     }
 
     write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
@@ -1357,19 +1171,7 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
         set_label(pkt, conn, &setlabel[0], &setlabel[1]);
     }
 
-    struct conn conn_for_expectation;
-    if (OVS_UNLIKELY((ct_alg_ctl != CT_ALG_CTL_NONE) && conn)) {
-        memcpy(&conn_for_expectation, conn, sizeof conn_for_expectation);
-    }
-
-    ct_lock_unlock(&ct->buckets[bucket].lock);
-
-    if (is_un_nat_conn_valid(&conn_for_un_nat_copy)) {
-        create_un_nat_conn(ct, &conn_for_un_nat_copy, now, !!alg_exp);
-    }
-
-    handle_alg_ctl(ct, ctx, pkt, ct_alg_ctl, conn, now, !!nat_action_info,
-                   &conn_for_expectation);
+    handle_alg_ctl(ct, ctx, pkt, ct_alg_ctl, conn, now, !!nat_action_info);
 }
 
 /* Sends the packets in '*pkt_batch' through the connection tracker 'ct'.  All
@@ -1423,12 +1225,14 @@  conntrack_clear(struct dp_packet *packet)
 static void
 set_mark(struct dp_packet *pkt, struct conn *conn, uint32_t val, uint32_t mask)
 {
+    ovs_mutex_lock(&conn->lock);
     if (conn->alg_related) {
         pkt->md.ct_mark = conn->mark;
     } else {
         pkt->md.ct_mark = val | (pkt->md.ct_mark & ~(mask));
         conn->mark = pkt->md.ct_mark;
     }
+    ovs_mutex_unlock(&conn->lock);
 }
 
 static void
@@ -1436,6 +1240,7 @@  set_label(struct dp_packet *pkt, struct conn *conn,
           const struct ovs_key_ct_labels *val,
           const struct ovs_key_ct_labels *mask)
 {
+    ovs_mutex_lock(&conn->lock);
     if (conn->alg_related) {
         pkt->md.ct_label = conn->label;
     } else {
@@ -1450,6 +1255,7 @@  set_label(struct dp_packet *pkt, struct conn *conn,
                               | (pkt->md.ct_label.u64.hi & ~(m.u64.hi));
         conn->label = pkt->md.ct_label;
     }
+    ovs_mutex_unlock(&conn->lock);
 }
 
 
@@ -1458,29 +1264,38 @@  set_label(struct dp_packet *pkt, struct conn *conn,
  * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
  * if 'limit' is reached */
 static long long
-sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb,
-             long long now, size_t limit)
-    OVS_REQUIRES(ctb->lock)
+ct_sweep(struct conntrack *ct, long long now, size_t limit)
 {
     struct conn *conn, *next;
     long long min_expiration = LLONG_MAX;
     size_t count = 0;
 
+    ovs_mutex_lock(&ct->ct_lock);
+
     for (unsigned i = 0; i < N_CT_TM; i++) {
-        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
-            if (!conn_expired(conn, now) || count >= limit) {
+        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
+            ovs_mutex_lock(&conn->lock);
+            if (now < conn->expiration || count >= limit) {
                 min_expiration = MIN(min_expiration, conn->expiration);
+                ovs_mutex_unlock(&conn->lock);
                 if (count >= limit) {
                     /* Do not check other lists. */
                     COVERAGE_INC(conntrack_long_cleanup);
-                    return min_expiration;
+                    goto out;
                 }
                 break;
+            } else {
+                ovs_mutex_unlock(&conn->lock);
+                conn_clean(ct, conn);
             }
-            conn_clean(ct, conn, ctb);
             count++;
         }
     }
+
+out:
+    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
+             time_msec() - now);
+    ovs_mutex_unlock(&ct->ct_lock);
     return min_expiration;
 }
 
@@ -1491,50 +1306,11 @@  sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb,
 static long long
 conntrack_clean(struct conntrack *ct, long long now)
 {
-    long long next_wakeup = now + CT_TM_MIN;
     unsigned int n_conn_limit;
-    size_t clean_count = 0;
-
     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
-    size_t clean_min = n_conn_limit > CONNTRACK_BUCKETS * 10
-        ? n_conn_limit / (CONNTRACK_BUCKETS * 10) : 1;
-
-    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
-        struct conntrack_bucket *ctb = &ct->buckets[i];
-        size_t prev_count;
-        long long min_exp;
-
-        ovs_mutex_lock(&ctb->cleanup_mutex);
-        if (ctb->next_cleanup > now) {
-            goto next_bucket;
-        }
-
-        ct_lock_lock(&ctb->lock);
-        prev_count = hmap_count(&ctb->connections);
-        /* If the connections are well distributed among buckets, we want to
-         * limit to 10% of the global limit equally split among buckets. If
-         * the bucket is busier than the others, we limit to 10% of its
-         * current size. */
-        min_exp = sweep_bucket(ct, ctb, now, MAX(prev_count / 10, clean_min));
-        clean_count += prev_count - hmap_count(&ctb->connections);
-
-        if (min_exp > now) {
-            /* We call hmap_shrink() only if sweep_bucket() managed to delete
-             * every expired connection. */
-            hmap_shrink(&ctb->connections);
-        }
-
-        ct_lock_unlock(&ctb->lock);
-
-        ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
-
-next_bucket:
-        next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
-        ovs_mutex_unlock(&ctb->cleanup_mutex);
-    }
-
-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
-             clean_count, time_msec() - now);
+    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
+    long long min_exp = ct_sweep(ct, now, clean_max);
+    long long next_wakeup = MIN(min_exp, now + CT_TM_MIN);
 
     return next_wakeup;
 }
@@ -1553,9 +1329,9 @@  next_bucket:
  *   are coping with the current cleanup tasks, then we wait at least
  *   5 seconds to do further cleanup.
  *
- * - We don't want to keep the buckets locked too long, as we might prevent
+ * - We don't want to keep the map locked too long, as we might prevent
  *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
- *   behind, there is at least some 200ms blocks of time when buckets will be
+ *   behind, there is at least some 200ms blocks of time when the map will be
  *   left alone, so the datapath can operate unhindered.
  */
 #define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
@@ -2269,9 +2045,11 @@  nat_select_range_tuple(struct conntrack *ct, const struct conn *conn,
             nat_conn->rev_key.src.port = htons(port);
         }
 
-        bool new_insert = nat_conn_keys_insert(&ct->nat_conn_keys, nat_conn,
-                                               ct->hash_basis);
-        if (new_insert) {
+        uint32_t conn_hash = conn_key_hash(&nat_conn->rev_key,
+                                           ct->hash_basis);
+        bool found = conn_key_lookup(ct, &nat_conn->rev_key, conn_hash,
+                                     time_msec(), NULL, NULL);
+        if (!found) {
             return true;
         } else if (pat_enabled && !all_ports_tried) {
             if (min_port == max_port) {
@@ -2313,101 +2091,26 @@  nat_select_range_tuple(struct conntrack *ct, const struct conn *conn,
     return false;
 }
 
-/* This function must be called with the ct->resources lock taken. */
-static struct nat_conn_key_node *
-nat_conn_keys_lookup(struct hmap *nat_conn_keys,
-                     const struct conn_key *key,
-                     uint32_t basis)
-{
-    struct nat_conn_key_node *nat_conn_key_node;
-
-    HMAP_FOR_EACH_WITH_HASH (nat_conn_key_node, node,
-                             conn_key_hash(key, basis), nat_conn_keys) {
-        if (!conn_key_cmp(&nat_conn_key_node->key, key)) {
-            return nat_conn_key_node;
-        }
-    }
-    return NULL;
-}
-
-/* This function must be called with the ct->resources lock taken. */
-static bool
-nat_conn_keys_insert(struct hmap *nat_conn_keys, const struct conn *nat_conn,
-                     uint32_t basis)
-{
-    struct nat_conn_key_node *nat_conn_key_node =
-        nat_conn_keys_lookup(nat_conn_keys, &nat_conn->rev_key, basis);
-
-    if (!nat_conn_key_node) {
-        struct nat_conn_key_node *nat_conn_key = xzalloc(sizeof *nat_conn_key);
-        memcpy(&nat_conn_key->key, &nat_conn->rev_key,
-               sizeof nat_conn_key->key);
-        memcpy(&nat_conn_key->value, &nat_conn->key,
-               sizeof nat_conn_key->value);
-        hmap_insert(nat_conn_keys, &nat_conn_key->node,
-                    conn_key_hash(&nat_conn_key->key, basis));
-        return true;
-    }
-    return false;
-}
-
-/* This function must be called with the ct->resources write lock taken. */
-static void
-nat_conn_keys_remove(struct hmap *nat_conn_keys,
-                     const struct conn_key *key,
-                     uint32_t basis)
-{
-    struct nat_conn_key_node *nat_conn_key_node;
-
-    HMAP_FOR_EACH_WITH_HASH (nat_conn_key_node, node,
-                             conn_key_hash(key, basis), nat_conn_keys) {
-        if (!conn_key_cmp(&nat_conn_key_node->key, key)) {
-            hmap_remove(nat_conn_keys, &nat_conn_key_node->node);
-            free(nat_conn_key_node);
-            return;
-        }
-    }
-}
-
-static void
-conn_key_lookup(struct conntrack_bucket *ctb, struct conn_lookup_ctx *ctx,
-                long long now)
-    OVS_REQUIRES(ctb->lock)
-{
-    uint32_t hash = ctx->hash;
-    struct conn *conn;
-
-    ctx->conn = NULL;
-
-    HMAP_FOR_EACH_WITH_HASH (conn, node, hash, &ctb->connections) {
-        if (!conn_key_cmp(&conn->key, &ctx->key)
-                && !conn_expired(conn, now)) {
-            ctx->conn = conn;
-            ctx->reply = false;
-            break;
-        }
-        if (!conn_key_cmp(&conn->rev_key, &ctx->key)
-                && !conn_expired(conn, now)) {
-            ctx->conn = conn;
-            ctx->reply = true;
-            break;
-        }
-    }
-}
-
 static enum ct_update_res
-conn_update(struct conn *conn, struct conntrack_bucket *ctb,
-            struct dp_packet *pkt, bool reply, long long now)
+conn_update(struct conntrack *ct, struct conn *conn, struct dp_packet *pkt,
+            struct conn_lookup_ctx *ctx, long long now)
 {
-    return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt,
-                                                      reply, now);
+    ovs_mutex_lock(&conn->lock);
+    enum ct_update_res update_res =
+        l4_protos[conn->key.nw_proto]->conn_update(ct, conn, pkt, ctx->reply,
+                                                   now);
+    ovs_mutex_unlock(&conn->lock);
+    return update_res;
 }
 
 static bool
 conn_expired(struct conn *conn, long long now)
 {
     if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-        return now >= conn->expiration;
+        ovs_mutex_lock(&conn->lock);
+        bool expired = now >= conn->expiration ? true : false;
+        ovs_mutex_unlock(&conn->lock);
+        return expired;
     }
     return false;
 }
@@ -2419,19 +2122,38 @@  valid_new(struct dp_packet *pkt, struct conn_key *key)
 }
 
 static struct conn *
-new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
-         struct conn_key *key, long long now)
+new_conn(struct conntrack *ct, struct dp_packet *pkt, struct conn_key *key,
+         long long now)
 {
-    return l4_protos[key->nw_proto]->new_conn(ctb, pkt, now);
+    return l4_protos[key->nw_proto]->new_conn(ct, pkt, now);
 }
 
 static void
-delete_conn(struct conn *conn)
+delete_conn_cmn(struct conn *conn)
 {
     free(conn->nat_info);
     free(conn->alg);
     free(conn);
 }
+
+static void
+delete_conn(struct conn *conn)
+{
+    ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
+    ovs_mutex_destroy(&conn->lock);
+    free(conn->nat_conn);
+    delete_conn_cmn(conn);
+}
+
+/* Only used by conn_clean_one(). */
+static void
+delete_conn_one(struct conn *conn)
+{
+    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
+        ovs_mutex_destroy(&conn->lock);
+    }
+    delete_conn_cmn(conn);
+}
 
 /* Convert a conntrack address 'a' into an IP address 'b' based on 'dl_type'.
  *
@@ -2526,14 +2248,19 @@  conn_to_ct_dpif_entry(const struct conn *conn, struct ct_dpif_entry *entry,
     conn_key_to_tuple(&conn->rev_key, &entry->tuple_reply);
 
     entry->zone = conn->key.zone;
-    entry->mark = conn->mark;
 
+    ovs_mutex_lock(&conn->lock);
+    entry->mark = conn->mark;
     memcpy(&entry->labels, &conn->label, sizeof entry->labels);
+    ovs_mutex_unlock(&conn->lock);
+
     /* Not implemented yet */
     entry->timestamp.start = 0;
     entry->timestamp.stop = 0;
 
+    ovs_mutex_lock(&conn->lock);
     long long expiration = conn->expiration - now;
+    ovs_mutex_unlock(&conn->lock);
     entry->timeout = (expiration > 0) ? expiration / 1000 : 0;
 
     struct ct_l4_proto *class = l4_protos[conn->key.nw_proto];
@@ -2567,7 +2294,7 @@  conntrack_dump_start(struct conntrack *ct, struct conntrack_dump *dump,
     }
 
     dump->ct = ct;
-    *ptot_bkts = CONNTRACK_BUCKETS;
+    *ptot_bkts = 1; /* Need to clean up the callers. */
     return 0;
 }
 
@@ -2577,36 +2304,21 @@  conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
     struct conntrack *ct = dump->ct;
     long long now = time_msec();
 
-    while (dump->bucket < CONNTRACK_BUCKETS) {
-        struct hmap_node *node;
-
-        ct_lock_lock(&ct->buckets[dump->bucket].lock);
-        for (;;) {
-            struct conn *conn;
-
-            node = hmap_at_position(&ct->buckets[dump->bucket].connections,
-                                    &dump->bucket_pos);
-            if (!node) {
-                break;
-            }
-            INIT_CONTAINER(conn, node, node);
-            if ((!dump->filter_zone || conn->key.zone == dump->zone) &&
-                 (conn->conn_type != CT_CONN_TYPE_UN_NAT)) {
-                conn_to_ct_dpif_entry(conn, entry, now, dump->bucket);
-                break;
-            }
-            /* Else continue, until we find an entry in the appropriate zone
-             * or the bucket has been scanned completely. */
+    for (;;) {
+        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
+                                                       &dump->cm_pos);
+        if (!cm_node) {
+            break;
         }
-        ct_lock_unlock(&ct->buckets[dump->bucket].lock);
-
-        if (!node) {
-            memset(&dump->bucket_pos, 0, sizeof dump->bucket_pos);
-            dump->bucket++;
-        } else {
+        struct conn *conn;
+        INIT_CONTAINER(conn, cm_node, cm_node);
+        if ((!dump->filter_zone || conn->key.zone == dump->zone) &&
+            (conn->conn_type != CT_CONN_TYPE_UN_NAT)) {
+            conn_to_ct_dpif_entry(conn, entry, now, 0);
             return 0;
         }
     }
+
     return EOF;
 }
 
@@ -2619,21 +2331,15 @@  conntrack_dump_done(struct conntrack_dump *dump OVS_UNUSED)
 int
 conntrack_flush(struct conntrack *ct, const uint16_t *zone)
 {
-    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
-        struct conntrack_bucket *ctb = &ct->buckets[i];
-        ovs_mutex_lock(&ctb->cleanup_mutex);
-        ct_lock_lock(&ctb->lock);
-        for (unsigned j = 0; j < N_CT_TM; j++) {
-            struct conn *conn, *next;
-            LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[j]) {
-                if (!zone || *zone == conn->key.zone) {
-                    conn_clean(ct, conn, ctb);
-                }
-            }
+    struct conn *conn;
+
+    ovs_mutex_lock(&ct->ct_lock);
+    CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
+        if (!zone || *zone == conn->key.zone) {
+            conn_clean_one(ct, conn);
         }
-        ct_lock_unlock(&ctb->lock);
-        ovs_mutex_unlock(&ctb->cleanup_mutex);
     }
+    ovs_mutex_unlock(&ct->ct_lock);
 
     return 0;
 }
@@ -2648,20 +2354,18 @@  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
     memset(&ctx, 0, sizeof(ctx));
     tuple_to_conn_key(tuple, zone, &ctx.key);
     ctx.hash = conn_key_hash(&ctx.key, ct->hash_basis);
-    unsigned bucket = hash_to_bucket(ctx.hash);
-    struct conntrack_bucket *ctb = &ct->buckets[bucket];
+    ovs_mutex_lock(&ct->ct_lock);
+    conn_key_lookup(ct, &ctx.key, ctx.hash, time_msec(), &ctx.conn,
+                    &ctx.reply);
 
-    ovs_mutex_lock(&ctb->cleanup_mutex);
-    ct_lock_lock(&ctb->lock);
-    conn_key_lookup(ctb, &ctx, time_msec());
     if (ctx.conn && ctx.conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-        conn_clean(ct, ctx.conn, ctb);
+        conn_clean(ct, ctx.conn);
     } else {
         VLOG_WARN("Must flush tuple using the original pre-NATed tuple");
         error = ENOENT;
     }
-    ct_lock_unlock(&ctb->lock);
-    ovs_mutex_unlock(&ctb->cleanup_mutex);
+
+    ovs_mutex_unlock(&ct->ct_lock);
     return error;
 }
 
@@ -2762,23 +2466,23 @@  expectation_ref_create(struct hindex *alg_expectation_refs,
 }
 
 static void
-expectation_clean(struct conntrack *ct, const struct conn_key *master_key,
-                  uint32_t basis)
+expectation_clean(struct conntrack *ct, const struct conn_key *master_key)
 {
-    ct_rwlock_wrlock(&ct->resources_lock);
+    ovs_rwlock_wrlock(&ct->resources_lock);
 
     struct alg_exp_node *node, *next;
     HINDEX_FOR_EACH_WITH_HASH_SAFE (node, next, node_ref,
-                                    conn_key_hash(master_key, basis),
+                                    conn_key_hash(master_key, ct->hash_basis),
                                     &ct->alg_expectation_refs) {
         if (!conn_key_cmp(&node->master_key, master_key)) {
-            expectation_remove(&ct->alg_expectations, &node->key, basis);
+            expectation_remove(&ct->alg_expectations, &node->key,
+                               ct->hash_basis);
             hindex_remove(&ct->alg_expectation_refs, &node->node_ref);
             free(node);
         }
     }
 
-    ct_rwlock_unlock(&ct->resources_lock);
+    ovs_rwlock_unlock(&ct->resources_lock);
 }
 
 static void
@@ -2836,12 +2540,12 @@  expectation_create(struct conntrack *ct, ovs_be16 dst_port,
     /* Take the write lock here because it is almost 100%
      * likely that the lookup will fail and
      * expectation_create() will be called below. */
-    ct_rwlock_wrlock(&ct->resources_lock);
+    ovs_rwlock_wrlock(&ct->resources_lock);
     struct alg_exp_node *alg_exp = expectation_lookup(
         &ct->alg_expectations, &alg_exp_node->key, ct->hash_basis, src_ip_wc);
     if (alg_exp) {
         free(alg_exp_node);
-        ct_rwlock_unlock(&ct->resources_lock);
+        ovs_rwlock_unlock(&ct->resources_lock);
         return;
     }
 
@@ -2850,7 +2554,7 @@  expectation_create(struct conntrack *ct, ovs_be16 dst_port,
                 conn_key_hash(&alg_exp_node->key, ct->hash_basis));
     expectation_ref_create(&ct->alg_expectation_refs, alg_exp_node,
                            ct->hash_basis);
-    ct_rwlock_unlock(&ct->resources_lock);
+    ovs_rwlock_unlock(&ct->resources_lock);
 }
 
 static void
@@ -3252,7 +2956,7 @@  adj_seqnum(ovs_16aligned_be32 *val, int32_t inc)
 
 static void
 handle_ftp_ctl(struct conntrack *ct, const struct conn_lookup_ctx *ctx,
-               struct dp_packet *pkt, const struct conn *ec, long long now,
+               struct dp_packet *pkt, struct conn *ec, long long now,
                enum ftp_ctl_pkt ftp_ctl, bool nat)
 {
     struct ip_header *l3_hdr = dp_packet_l3(pkt);
@@ -3342,7 +3046,7 @@  handle_ftp_ctl(struct conntrack *ct, const struct conn_lookup_ctx *ctx,
     }
 
     if (seq_skew) {
-        conn_seq_skew_set(ct, &ec->key, now, seq_skew + ec->seq_skew,
+        conn_seq_skew_set(ct, ec, now, seq_skew + ec->seq_skew,
                           ctx->reply);
     }
 }
@@ -3350,10 +3054,9 @@  handle_ftp_ctl(struct conntrack *ct, const struct conn_lookup_ctx *ctx,
 static void
 handle_tftp_ctl(struct conntrack *ct,
                 const struct conn_lookup_ctx *ctx OVS_UNUSED,
-                struct dp_packet *pkt,
-                const struct conn *conn_for_expectation,
-                long long now OVS_UNUSED,
-                enum ftp_ctl_pkt ftp_ctl OVS_UNUSED, bool nat OVS_UNUSED)
+                struct dp_packet *pkt, struct conn *conn_for_expectation,
+                long long now OVS_UNUSED, enum ftp_ctl_pkt ftp_ctl OVS_UNUSED,
+                bool nat OVS_UNUSED)
 {
     expectation_create(ct, conn_for_expectation->key.src.port,
                        conn_for_expectation,
diff --git a/lib/conntrack.h b/lib/conntrack.h
index 8f4095f..2012150 100644
--- a/lib/conntrack.h
+++ b/lib/conntrack.h
@@ -19,6 +19,7 @@ 
 
 #include <stdbool.h>
 
+#include "cmap.h"
 #include "latch.h"
 #include "odp-netlink.h"
 #include "openvswitch/hmap.h"
@@ -38,20 +39,25 @@ 
  * Usage
  * =====
  *
- *     struct conntrack ct;
+ *     struct conntrack *ct;
  *
  * Initialization:
  *
- *     conntrack_init(&ct);
+ *     ct = conntrack_init();
  *
  * To send a group of packets through the connection tracker:
  *
- *     conntrack_execute(&ct, pkt_batch, ...);
+ *     conntrack_execute(ct, pkt_batch, ...);
  *
- * Thread-safety
- * =============
+ * Thread-safety:
  *
  * conntrack_execute() can be called by multiple threads simultaneoulsy.
+ *
+ * Shutdown:
+ *
+ *    1/ Shutdown packet input to the datapath
+ *    2/ Destroy PMD threads after quiescence.
+ *    3/ conntrack_destroy(ct);
  */
 
 struct dp_packet_batch;
@@ -93,7 +99,7 @@  void conntrack_clear(struct dp_packet *packet);
 struct conntrack_dump {
     struct conntrack *ct;
     unsigned bucket;
-    struct hmap_position bucket_pos;
+    struct cmap_position cm_pos;
     bool filter_zone;
     uint16_t zone;
 };
@@ -114,5 +120,4 @@  int conntrack_get_maxconns(struct conntrack *ct, uint32_t *maxconns);
 int conntrack_get_nconns(struct conntrack *ct, uint32_t *nconns);
 struct ipf *conntrack_ipf_ctx(struct conntrack *ct);
 
-
 #endif /* conntrack.h */
diff --git a/lib/ct-dpif.h b/lib/ct-dpif.h
index 0151cfe..2628c2b 100644
--- a/lib/ct-dpif.h
+++ b/lib/ct-dpif.h
@@ -73,7 +73,7 @@  struct ct_dpif_timestamp {
     CT_DPIF_TCP_STATE(TIME_WAIT) \
     CT_DPIF_TCP_STATE(MAX_NUM)
 
-enum ct_dpif_tcp_state {
+enum OVS_PACKED_ENUM ct_dpif_tcp_state {
 #define CT_DPIF_TCP_STATE(STATE) CT_DPIF_TCPS_##STATE,
     CT_DPIF_TCP_STATES
 #undef CT_DPIF_TCP_STATE