diff mbox series

[ovs-dev,RFC,4/5] conntrack: Split single cmap to multiple buckets.

Message ID 163820916292.3001886.15857595669410962866.stgit@fed.void
State RFC
Headers show
Series conntrack: Introduce buckets and reduce contention. | expand

Commit Message

Paolo Valerio Nov. 29, 2021, 6:06 p.m. UTC
The purpose of this commit is to split the current way of storing the
conn nodes. Before this patch the nodes were stored into a single cmap
using ct->lock to avoid concurrent write access.
With this commit a single connection can be stored into one or two (at
most) CONNTRACK_BUCKETS available based on the outcome of the function
hash_scale() on the key.
Every bucket has its local lock that needs to be acquired every time a
node has to be removed/inserted from/to the cmap.
This means that, in case the hash of the CT_DIR_FWD key differs from
the one of the CT_DIR_REV, we can end up having the reference of the
two key nodes in different buckets, and consequently acquiring two locks
(one per bucket).
This approach may be handy in different ways, depending on the way the
stale connection removal gets designed. The attempt of this patch is
to remove the expiration lists, removing the stale entries mostly in
two ways:

- during the key lookup
- when the sweeper task wakes up

the first case is not very strict, as we remove only expired entries
with the same hash. To increase its effectiveness, we should probably
increase the number of buckets and replace the cmaps with other data
structures like rcu lists.
The sweeper task instead takes charge of the remaining stale entries
removal. The heuristics used in the sweeper task are mostly an
example, but could be modified to match any possible uncovered use
case.

Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
---
The cover letter includes further details.
---
 lib/conntrack-private.h |   34 +++
 lib/conntrack-tp.c      |   42 ----
 lib/conntrack.c         |  461 +++++++++++++++++++++++++++++++----------------
 tests/system-traffic.at |    5 -
 4 files changed, 331 insertions(+), 211 deletions(-)

Comments

Gaetan Rivet Jan. 27, 2022, 2:42 a.m. UTC | #1
Hi Paolo,

On Mon, Nov 29, 2021, at 19:06, Paolo Valerio wrote:
> The purpose of this commit is to split the current way of storing the
> conn nodes. Before this patch the nodes were stored into a single cmap
> using ct->lock to avoid concurrent write access.
> With this commit a single connection can be stored into one or two (at
> most) CONNTRACK_BUCKETS available based on the outcome of the function
> hash_scale() on the key.

I think this approach overall works.
Removing the expiration list altogether might be worth it, but it would be nice
to have data to back it up. It simplifies expiration update and removes
another shared data structure, which is nice. However to accept the change
we must ensure that the ct sweep is within acceptable bounds.

How does the two solution compares regarding:

  * expiration latency (delta between expected expiration time vs. actual).
  * Memory consumption: depending on the structure, it's one additional node per conn.
    For a list, that represents 2x64 bits.
    Not that much, but that's a benefit of removing the exp list.
  * CPU utilization / average cycles per conn: how heavy is it on the CPU to
    iterate a CMAP vs. a linked-list?
    CMAPs are heavy in coherency traffic due to constant atomic reads, vs. a list
    that is mostly pointer dereference.

Maybe a micro-benchmark in tests/test-conntrack.c for the aging process could help
measure those metrics isolated from the rest of OVS.

That seems quite a bit of additional work, maybe you would be able to provide relevant
comparison without writing this additional benchmark.
On the other hand, maybe it only consists in inserting N conns, artificially advancing
the internal OVS clock by the timeout and measuring the behavior. I'm not sure.

I measured the time to insert 1M connections with your patchset, my ct-scale series and
the current baseline:

n-thread\version   baseline  ct-scale  ct-bucket
               1    831 ms    737 ms    715 ms
               2   1177 ms    882 ms    867 ms
               3   1524 ms   1000 ms   1012 ms
               4   2595 ms   1128 ms   1139 ms

So overall at least on insertion in conntrack_execute, it seem your solution is
pretty much on par with the series I proposed, and both are improvements compared to current baseline.

Scaling is not yet linear though, and the mutex locking still appears in profiling with
multiple threads.

> Every bucket has its local lock that needs to be acquired every time a
> node has to be removed/inserted from/to the cmap.
> This means that, in case the hash of the CT_DIR_FWD key differs from
> the one of the CT_DIR_REV, we can end up having the reference of the
> two key nodes in different buckets, and consequently acquiring two locks
> (one per bucket).
> This approach may be handy in different ways, depending on the way the
> stale connection removal gets designed. The attempt of this patch is
> to remove the expiration lists, removing the stale entries mostly in
> two ways:
>
> - during the key lookup
> - when the sweeper task wakes up
>
> the first case is not very strict, as we remove only expired entries
> with the same hash. To increase its effectiveness, we should probably
> increase the number of buckets and replace the cmaps with other data
> structures like rcu lists.

Did you try several number of buckets?
I see 2^10 being used now, it seems a bit large to me. Does it provide tangible
benefits? Did you measure contention vs. 512, or 256 buckets?

Replacing a CMAP by N rculists seems to be re-implementing a sharded CMAP with
one lock per internal bucket in a sense. However the bucket array would be static
and would now resize with the CMAP.

It would result in a poorly implemented hash-map with integrated
locks and static number of buckets. It should either be possible to find a solution
with existing structure, or a new structure should be instead cleanly designed.

> The sweeper task instead takes charge of the remaining stale entries
> removal. The heuristics used in the sweeper task are mostly an
> example, but could be modified to match any possible uncovered use
> case.

Which metric did you use to choose the new heuristics for the cleanup?

Otherwise I have a few remarks or suggestions below.

>
> Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
> ---
> The cover letter includes further details.
> ---
>  lib/conntrack-private.h |   34 +++
>  lib/conntrack-tp.c      |   42 ----
>  lib/conntrack.c         |  461 +++++++++++++++++++++++++++++++----------------
>  tests/system-traffic.at |    5 -
>  4 files changed, 331 insertions(+), 211 deletions(-)
>
> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
> index ea5ba3d9e..a89ff96fa 100644
> --- a/lib/conntrack-private.h
> +++ b/lib/conntrack-private.h
> @@ -95,6 +95,7 @@ struct alg_exp_node {
> 
>  struct conn_key_node {
>      struct conn_key key;
> +    uint32_t key_hash;

Caching the key hash is necessary, for quick lookup and deletion
in the cmap.

Unfortunately, as you need to use hash_key_scale() to find the corresponding
bucket, you are unable to use the OVS_ACQUIRES / OVS_RELEASES marks on buckets_{lock,unlock}(), which throws off clang's safety analysis.

This safety analysis is very useful. The basic workaround would be to mark those functions
with OVS_NO_THREAD_SAFETY_ANALYSIS but it is not a good solution as it will mask issues.

Instead, a possible way is to store there the key_bucket, which will be the
bucket index corresponding to this key_hash. Then:

buckets_lock(ct, p1, p2)
   OVS_ACQUIRES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)

buckets_unlock(ct, p1, p2)
   OVS_RELEASES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)

where p1 == conn->key_node[0].key_bucket;
(by the way, those functions might be better renamed ct_bucket_lock() I think).

Not sure this solution would work, but I think it's definitely worth trying to find one.
If you select CC=clang with ./configure, -Wthread-safety should be automatically used.

>      struct cmap_node cm_node;
>  };
> 
> @@ -102,7 +103,6 @@ struct conn {
>      /* Immutable data. */
>      struct conn_key_node key_node[CT_DIR_MAX];
>      struct conn_key parent_key; /* Only used for orig_tuple support. */
> -    struct ovs_list exp_node;
> 
>      uint16_t nat_action;
>      char *alg;
> @@ -121,7 +121,9 @@ struct conn {
>      /* Mutable data. */
>      bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
>                          * control messages; true if reply direction. */
> -    bool cleaned; /* True if cleaned from expiry lists. */
> +    atomic_flag cleaned; /* True if the entry was stale and one of the
> +                          * cleaner (i.e. packet path or sweeper) took
> +                          * charge of it. */
> 
>      /* Immutable data. */
>      bool alg_related; /* True if alg data connection. */
> @@ -192,10 +194,25 @@ enum ct_timeout {
>      N_CT_TM
>  };
> 
> -struct conntrack {
> -    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
> +#define CONNTRACK_BUCKETS_SHIFT 10
> +#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
> +
> +struct ct_bucket {
> +    /* Protects 'conns'. In case of natted conns, there's a high
> +     * chance that the forward and the reverse key stand in different
> +     * buckets. buckets_lock() should be the preferred way to acquire
> +     * these locks (unless otherwise needed), as it deals with the
> +     * acquisition order. */
> +    struct ovs_mutex lock;
> +    /* Contains the connections in the bucket, indexed by
> +     * 'struct conn_key'. */
>      struct cmap conns OVS_GUARDED;
> -    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
> +};
> +
> +struct conntrack {
> +    struct ct_bucket buckets[CONNTRACK_BUCKETS];
> +    unsigned int next_bucket;
> +    struct ovs_mutex ct_lock;
>      struct cmap zone_limits OVS_GUARDED;
>      struct cmap timeout_policies OVS_GUARDED;
>      uint32_t hash_basis; /* Salt for hashing a connection key. */
> @@ -220,9 +237,10 @@ struct conntrack {
>  };
> 
>  /* Lock acquisition order:
> - *    1. 'ct_lock'
> - *    2. 'conn->lock'
> - *    3. 'resources_lock'
> + *    1. 'buckets[p1]->lock'
> + *    2  'buckets[p2]->lock' (with p1 < p2)
> + *    3. 'conn->lock'
> + *    4. 'resources_lock'
>   */
> 
>  extern struct ct_l4_proto ct_proto_tcp;
> diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
> index 9ecb06978..117810528 100644
> --- a/lib/conntrack-tp.c
> +++ b/lib/conntrack-tp.c
> @@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
>      return CT_DPIF_TP_ATTR_MAX;
>  }
> 
> -static void
> -conn_update_expiration__(struct conntrack *ct, struct conn *conn,
> -                         enum ct_timeout tm, long long now,
> -                         uint32_t tp_value)
> -    OVS_REQUIRES(conn->lock)
> -{
> -    ovs_mutex_unlock(&conn->lock);
> -
> -    ovs_mutex_lock(&ct->ct_lock);
> -    ovs_mutex_lock(&conn->lock);
> -    if (!conn->cleaned) {
> -        conn->expiration = now + tp_value * 1000;
> -        ovs_list_remove(&conn->exp_node);
> -        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
> -    }
> -    ovs_mutex_unlock(&conn->lock);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -
> -    ovs_mutex_lock(&conn->lock);
> -}
> -
>  /* The conn entry lock must be held on entry and exit. */
>  void
>  conn_update_expiration(struct conntrack *ct, struct conn *conn,
> @@ -266,42 +245,25 @@ conn_update_expiration(struct conntrack *ct, 
> struct conn *conn,
>      struct timeout_policy *tp;
>      uint32_t val;
> 
> -    ovs_mutex_unlock(&conn->lock);
> -
> -    ovs_mutex_lock(&ct->ct_lock);
> -    ovs_mutex_lock(&conn->lock);
>      tp = timeout_policy_lookup(ct, conn->tp_id);
>      if (tp) {
>          val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
>      } else {
>          val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
>      }
> -    ovs_mutex_unlock(&conn->lock);
> -    ovs_mutex_unlock(&ct->ct_lock);
> 
> -    ovs_mutex_lock(&conn->lock);
>      VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
>                  "val=%u sec.",
>                  ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
>                  conn->tp_id, val);
> 
> -    conn_update_expiration__(ct, conn, tm, now, val);
> -}
> -
> -static void
> -conn_init_expiration__(struct conntrack *ct, struct conn *conn,
> -                       enum ct_timeout tm, long long now,
> -                       uint32_t tp_value)
> -{
> -    conn->expiration = now + tp_value * 1000;
> -    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
> +    conn->expiration = now + val * 1000;
>  }
> 
>  /* ct_lock must be held. */
>  void
>  conn_init_expiration(struct conntrack *ct, struct conn *conn,
>                       enum ct_timeout tm, long long now)
> -    OVS_REQUIRES(ct->ct_lock)
>  {
>      struct timeout_policy *tp;
>      uint32_t val;
> @@ -317,5 +279,5 @@ conn_init_expiration(struct conntrack *ct, struct 
> conn *conn,
>                  ct_timeout_str[tm], 
> conn->key_node[CT_DIR_FWD].key.zone,
>                  conn->tp_id, val);
> 
> -    conn_init_expiration__(ct, conn, tm, now, val);
> +    conn->expiration = now + val * 1000;
>  }
> diff --git a/lib/conntrack.c b/lib/conntrack.c
> index a284c57c0..1c019af29 100644
> --- a/lib/conntrack.c
> +++ b/lib/conntrack.c

At the beginning of the file, there is a remaining reference to a stats
that you otherwise removed:

@@ -44,7 +44,6 @@
 VLOG_DEFINE_THIS_MODULE(conntrack);

 COVERAGE_DEFINE(conntrack_full);
-COVERAGE_DEFINE(conntrack_long_cleanup);
 COVERAGE_DEFINE(conntrack_l3csum_err);
 COVERAGE_DEFINE(conntrack_l4csum_err);
 COVERAGE_DEFINE(conntrack_lookup_natted_miss);
@@ -560,6 +559,7 @@ conn_out_found:

> @@ -85,9 +85,12 @@ struct zone_limit {
>      struct conntrack_zone_limit czl;
>  };
> 
> +static unsigned hash_scale(uint32_t hash);
> +static void conn_clean(struct conntrack *ct, struct conn *conn);
>  static bool conn_key_extract(struct conntrack *, struct dp_packet *,
>                               ovs_be16 dl_type, struct conn_lookup_ctx *,
>                               uint16_t zone);
> +static uint32_t cached_key_hash(struct conn_key_node *n);

This function made me think something was being computed,
while it's only reading the key_node field. I think it's worth
just directly accessing it.

>  static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
>  static void conn_key_reverse(struct conn_key *);
>  static bool valid_new(struct dp_packet *pkt, struct conn_key *);
> @@ -109,8 +112,9 @@ static void set_label(struct dp_packet *, struct conn *,
>  static void *clean_thread_main(void *f_);
> 
>  static bool
> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
> -                     const struct nat_action_info_t *nat_info);
> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
> +                          const struct nat_action_info_t *nat_info,
> +                          uint32_t *rev_hash);
> 
>  static uint8_t
>  reverse_icmp_type(uint8_t type);
> @@ -249,16 +253,17 @@ conntrack_init(void)
>      ovs_rwlock_unlock(&ct->resources_lock);
> 
>      ovs_mutex_init_adaptive(&ct->ct_lock);
> -    ovs_mutex_lock(&ct->ct_lock);
> -    cmap_init(&ct->conns);
> -    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
> -        ovs_list_init(&ct->exp_lists[i]);
> +
> +    ct->next_bucket = 0;
> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        struct ct_bucket *bucket = &ct->buckets[i];
> +        cmap_init(&bucket->conns);
> +        ovs_mutex_init_recursive(&bucket->lock);

Does it need to be recursive? How does it happen?
I think this mutex could be made adaptive, it might be worth profiling contention
and checking if it helps or not.

>      }
> +
>      cmap_init(&ct->zone_limits);
>      ct->zone_limit_seq = 0;
>      timeout_policy_init(ct);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -
>      atomic_count_init(&ct->n_conn, 0);
>      atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>      atomic_init(&ct->tcp_seq_chk, true);
> @@ -410,9 +415,9 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
>  }
> 
>  static void
> -conn_clean(struct conntrack *ct, struct conn *conn)
> -    OVS_REQUIRES(ct->ct_lock)
> +conn_clean__(struct conntrack *ct, struct conn *conn)
>  {
> +    struct ct_bucket *bucket;
>      struct zone_limit *zl;
>      uint32_t hash;
> 
> @@ -420,8 +425,9 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>          expectation_clean(ct, &conn->key_node[CT_DIR_FWD].key);
>      }
> 
> -    hash = conn_key_hash(&conn->key_node[CT_DIR_FWD].key, ct->hash_basis);
> -    cmap_remove(&ct->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
> +    hash = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
> +    bucket = &ct->buckets[hash_scale(hash)];
> +    cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
> 
>      zl = zone_limit_lookup(ct, conn->admit_zone);
>      if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
> @@ -429,12 +435,10 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>      }
> 
>      if (conn->nat_action) {
> -        hash = conn_key_hash(&conn->key_node[CT_DIR_REV].key,
> -                             ct->hash_basis);
> -        cmap_remove(&ct->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
> +        hash = cached_key_hash(&conn->key_node[CT_DIR_REV]);
> +        bucket = &ct->buckets[hash_scale(hash)];
> +        cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
>      }
> -    ovs_list_remove(&conn->exp_node);
> -    conn->cleaned = true;
>      ovsrcu_postpone(delete_conn, conn);
>      atomic_count_dec(&ct->n_conn);
>  }
> @@ -446,22 +450,35 @@ void
>  conntrack_destroy(struct conntrack *ct)
>  {
>      struct conn_key_node *keyn;
> +    struct ct_bucket *bucket;
>      struct conn *conn;
> +    int i;
> 
>      latch_set(&ct->clean_thread_exit);
>      pthread_join(ct->clean_thread, NULL);
>      latch_destroy(&ct->clean_thread_exit);
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
> -        if (keyn->key.dir != CT_DIR_FWD) {
> -            continue;
> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        bucket = &ct->buckets[i];
> +        CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
> +            if (keyn->key.dir != CT_DIR_FWD) {
> +                continue;
> +            }
> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +            conn_clean(ct, conn);
>          }
> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> -        conn_clean(ct, conn);
>      }
> -    cmap_destroy(&ct->conns);
> 
> +    /* XXX: we need this loop because connections may be in multiple
> +     * buckets.  The former loop should probably use conn_clean__()
> +     * or an unlocked version of conn_clean(). */
> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        bucket = &ct->buckets[i];
> +        ovs_mutex_destroy(&bucket->lock);
> +        cmap_destroy(&ct->buckets[i].conns);
> +    }
> +
> +    ovs_mutex_lock(&ct->ct_lock);
>      struct zone_limit *zl;
>      CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
>          uint32_t hash = zone_key_hash(zl->czl.zone, ct->hash_basis);
> @@ -498,45 +515,108 @@ conntrack_destroy(struct conntrack *ct)
>  }
> 
> 
> +static unsigned hash_scale(uint32_t hash)
> +{
> +    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
> +}
> +

I am not sure I understand this.

You have a hash of width 32 bits.
You shift right by 22 bits (10 bits remaining).
Then you use modulo with a power of two, effectively doing bitwise & on the
10 LSBs, which seems redundant at this point.

Is there a reason to prefer the MSBs to the LSBs? Is there a specific property of the hash function that requires it?

>  static bool
> -conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
> -                uint32_t hash, long long now, struct conn **conn_out,
> +conn_key_lookup(struct conntrack *ct, unsigned bucket,
> +                const struct conn_key *key, uint32_t hash,
> +                long long now, struct conn **conn_out,
>                  bool *reply)
>  {
> +    struct ct_bucket *ctb = &ct->buckets[bucket];
>      struct conn_key_node *keyn;
> -    struct conn *conn = NULL;
>      bool found = false;
> +    struct conn *conn;
> 
> -    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ct->conns) {
> +    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ctb->conns) {
>          conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +        if (conn_expired(conn, now)) {
> +            conn_clean(ct, conn);
> +            continue;
> +        }
> +
>          for (int i = CT_DIR_FWD; i < CT_DIR_MAX; i++) {
> -            if (!conn_key_cmp(&conn->key_node[i].key, key) &&
> -                !conn_expired(conn, now)) {
> +            if (!conn_key_cmp(&conn->key_node[i].key, key)) {
>                  found = true;
>                  if (reply) {
>                      *reply = i;
>                  }
> -                goto out_found;
> +
> +                goto conn_out_found;
>              }
>          }
>      }
> 
> -out_found:
> -    if (found && conn_out) {
> -        *conn_out = conn;
> -    } else if (conn_out) {
> -        *conn_out = NULL;
> +conn_out_found:
> +    if (conn_out) {
> +        *conn_out = found ? conn : NULL;
>      }
> 
>      return found;
>  }
> 
> +static void
> +buckets_unlock(struct conntrack *ct, uint32_t h1, uint32_t h2)

ct_buckets_unlock?

> +{
> +    unsigned p1 = hash_scale(h1),
> +        p2 = hash_scale(h2);
> +
> +    if (p1 > p2) {

Keeping the comparison the same as in buckets_lock permits to see at a glance
that the locking order is the reverse of their precedence for unlock, which
IMO helps seeing that it is correctly done.

So I think (p1 < p2) there and then swapping p1 and p2 below would be easier to read.

> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
> +    } else if (p1 < p2) {
> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +    } else {
> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +    }
> +}
> +
> +/* Acquires both locks in an ordered way. */
> +static void
> +buckets_lock(struct conntrack *ct, uint32_t h1, uint32_t h2)
> +{
> +    unsigned p1 = hash_scale(h1),
> +        p2 = hash_scale(h2);
> +
> +    if (p1 < p2) {
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +        ovs_mutex_lock(&ct->buckets[p2].lock);
> +    } else if (p1 > p2) {
> +        ovs_mutex_lock(&ct->buckets[p2].lock);
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +    } else {
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +    }
> +}
> +
> +static void
> +conn_clean(struct conntrack *ct, struct conn *conn)
> +{
> +    uint32_t h1, h2;
> +
> +    if (atomic_flag_test_and_set(&conn->cleaned)) {
> +        return;
> +    }
> +
> +    h1 = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
> +    h2 = cached_key_hash(&conn->key_node[CT_DIR_REV]);
> +    buckets_lock(ct, h1, h2);
> +    conn_clean__(ct, conn);
> +    buckets_unlock(ct, h1, h2);
> +}
> +
>  static bool
>  conn_lookup(struct conntrack *ct, const struct conn_key *key,
>              long long now, struct conn **conn_out, bool *reply)
>  {
>      uint32_t hash = conn_key_hash(key, ct->hash_basis);
> -    return conn_key_lookup(ct, key, hash, now, conn_out, reply);
> +    unsigned bucket = hash_scale(hash);
> +
> +    return conn_key_lookup(ct, bucket, key, hash, now, conn_out, reply);
>  }
> 
>  static void
> @@ -944,7 +1024,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>                 const struct nat_action_info_t *nat_action_info,
>                 const char *helper, const struct alg_exp_node *alg_exp,
>                 enum ct_alg_ctl_type ct_alg_ctl, uint32_t tp_id)
> -    OVS_REQUIRES(ct->ct_lock)
>  {
>      struct conn *nc = NULL;
>      uint32_t rev_hash = ctx->hash;
> @@ -954,6 +1033,8 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>          return nc;
>      }
> 
> +    /* XXX: We are unlocked here, so we don't know
> +     * if the tuple already exists in the table. */

Can you expand more on the issue here?
Shouldn't we be locked for the lookup?

>      pkt->md.ct_state = CS_NEW;
> 
>      if (alg_exp) {
> @@ -961,10 +1042,11 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>      }
> 
>      if (commit) {
> -        struct conn_key_node *fwd_key_node, *rev_key_node;
> -
>          struct zone_limit *zl = zone_limit_lookup_or_default(ct,
>                                                               
> ctx->key.zone);
> +        struct conn_key_node *fwd_key_node, *rev_key_node;
> +        bool handle_tuple = false;
> +
>          if (zl && atomic_count_get(&zl->czl.count) >= zl->czl.limit) {
>              return nc;
>          }
> @@ -1007,22 +1089,40 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>                      nc->nat_action = NAT_ACTION_DST;
>                  }
>              } else {
> -                bool nat_res = nat_get_unique_tuple(ct, nc, 
> nat_action_info);
> -
> -                if (!nat_res) {
> -                    goto nat_res_exhaustion;
> -                }
> +                handle_tuple = true;
>              }
> -
> -            nat_packet(pkt, nc, ctx->icmp_related);
> -            rev_hash = conn_key_hash(&rev_key_node->key, 
> ct->hash_basis);
> -            rev_key_node->key.dir = CT_DIR_REV;
> -            cmap_insert(&ct->conns, &rev_key_node->cm_node, rev_hash);
>          }
> 
>          ovs_mutex_init_adaptive(&nc->lock);
> +        atomic_flag_clear(&nc->cleaned);
>          fwd_key_node->key.dir = CT_DIR_FWD;
> -        cmap_insert(&ct->conns, &fwd_key_node->cm_node, ctx->hash);
> +        rev_key_node->key.dir = CT_DIR_REV;
> +
> +        if (handle_tuple) {
> +            bool nat_res = nat_get_unique_tuple_lock(ct, nc, nat_action_info,
> +                                                     &rev_hash);
> +
> +            if (!nat_res) {
> +                goto out_error;
> +            }
> +        } else {
> +            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
> +            buckets_lock(ct, ctx->hash, rev_hash);
> +        }
> +
> +        if (conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
> +            goto out_error_unlock;
> +        }
> +
> +        fwd_key_node->key_hash = ctx->hash;
> +        cmap_insert(&ct->buckets[hash_scale(ctx->hash)].conns,
> +                    &fwd_key_node->cm_node, ctx->hash);
> +        if (nat_action_info) {
> +            rev_key_node->key_hash = rev_hash;
> +            cmap_insert(&ct->buckets[hash_scale(rev_hash)].conns,
> +                        &rev_key_node->cm_node, rev_hash);
> +            nat_packet(pkt, nc, ctx->icmp_related);
> +        }
> 
>          atomic_count_inc(&ct->n_conn);
>          ctx->conn = nc; /* For completeness. */
> @@ -1033,21 +1133,23 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>          } else {
>              nc->admit_zone = INVALID_ZONE;
>          }
> +        buckets_unlock(ct, ctx->hash, rev_hash);
>      }
> 
>      return nc;
> 
> +out_error_unlock:
> +    buckets_unlock(ct, ctx->hash, rev_hash);
>      /* This would be a user error or a DOS attack.  A user error is prevented
>       * by allocating enough combinations of NAT addresses when combined with
>       * ephemeral ports.  A DOS attack should be protected against with
>       * firewall rules or a separate firewall.  Also using zone partitioning
>       * can limit DoS impact. */
> -nat_res_exhaustion:
> -    ovs_list_remove(&nc->exp_node);
> +out_error:
> +    ovs_mutex_destroy(&nc->lock);
>      delete_conn_cmn(nc);
>      static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
> -    VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
> -                 "if DoS attack, use firewalling and/or zone partitioning.");
> +    VLOG_WARN_RL(&rl, "Unable to insert a new connection.");
>      return NULL;
>  }
> 
> @@ -1082,12 +1184,7 @@ conn_update_state(struct conntrack *ct, struct 
> dp_packet *pkt,
>              pkt->md.ct_state = CS_INVALID;
>              break;
>          case CT_UPDATE_NEW:
> -            ovs_mutex_lock(&ct->ct_lock);
> -            if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
> -                            now, NULL, NULL)) {
> -                conn_clean(ct, conn);
> -            }
> -            ovs_mutex_unlock(&ct->ct_lock);
> +            conn_clean(ct, conn);
>              create_new_conn = true;
>              break;
>          case CT_UPDATE_VALID_NEW:
> @@ -1253,6 +1350,8 @@ static void
>  initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
>                      long long now, bool natted)
>  {
> +    unsigned bucket = hash_scale(ctx->hash);
> +
>      if (natted) {
>          /* If the packet has been already natted (e.g. a previous
>           * action took place), retrieve it performing a lookup of its
> @@ -1260,7 +1359,8 @@ initial_conn_lookup(struct conntrack *ct, struct 
> conn_lookup_ctx *ctx,
>          conn_key_reverse(&ctx->key);
>      }
> 
> -    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
> +    conn_key_lookup(ct, bucket, &ctx->key, ctx->hash,
> +                    now, &ctx->conn, &ctx->reply);
> 
>      if (natted) {
>          if (OVS_LIKELY(ctx->conn)) {
> @@ -1287,24 +1387,20 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>              ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper,
>              uint32_t tp_id)
>  {
> +    bool create_new_conn = false;
> +
>      /* Reset ct_state whenever entering a new zone. */
>      if (pkt->md.ct_state && pkt->md.ct_zone != zone) {
>          pkt->md.ct_state = 0;
>      }
> 
> -    bool create_new_conn = false;
>      initial_conn_lookup(ct, ctx, now, !!(pkt->md.ct_state &
>                                           (CS_SRC_NAT | CS_DST_NAT)));
>      struct conn *conn = ctx->conn;
> 
>      /* Delete found entry if in wrong direction. 'force' implies commit. */
>      if (OVS_UNLIKELY(force && ctx->reply && conn)) {
> -        ovs_mutex_lock(&ct->ct_lock);
> -        if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
> -                        now, NULL, NULL)) {
> -            conn_clean(ct, conn);
> -        }
> -        ovs_mutex_unlock(&ct->ct_lock);
> +        conn_clean(ct, conn);
>          conn = NULL;
>      }
> 
> @@ -1338,7 +1434,6 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>      struct alg_exp_node alg_exp_entry;
> 
>      if (OVS_UNLIKELY(create_new_conn)) {
> -
>          ovs_rwlock_rdlock(&ct->resources_lock);
>          alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
>                                       ct->hash_basis,
> @@ -1349,12 +1444,9 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>          }
>          ovs_rwlock_unlock(&ct->resources_lock);
> 
> -        ovs_mutex_lock(&ct->ct_lock);
> -        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
> -            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
> -                                  helper, alg_exp, ct_alg_ctl, tp_id);
> -        }
> -        ovs_mutex_unlock(&ct->ct_lock);
> +        conn = conn_not_found(ct, pkt, ctx, commit, now,
> +                              nat_action_info, helper, alg_exp,
> +                              ct_alg_ctl, tp_id);
>      }
> 
>      write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
> @@ -1467,83 +1559,92 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>  }
> 
> 
> -/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
> - * earliest expiration time among the remaining connections in 'ctb'.  Returns
> - * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
> - * if 'limit' is reached */
> +/* Delete the expired connections from 'bucket', up to 'limit'.
> + * Returns the earliest expiration time among the remaining
> + * connections in 'bucket'.  Returns LLONG_MAX if 'bucket' is empty.
> + * The return value might be smaller than 'now', if 'limit' is
> + * reached. */
>  static long long
> -ct_sweep(struct conntrack *ct, long long now, size_t limit)
> +sweep_bucket(struct conntrack *ct, struct ct_bucket *bucket,
> +             long long now)
>  {
> -    struct conn *conn, *next;
> -    long long min_expiration = LLONG_MAX;
> -    size_t count = 0;
> +    struct conn_key_node *keyn;
> +    unsigned int conn_count = 0;
> +    struct conn *conn;
> +    long long expiration;
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> +    CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
> +        if (keyn->key.dir != CT_DIR_FWD) {
> +            continue;
> +        }
> 
> -    for (unsigned i = 0; i < N_CT_TM; i++) {
> -        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
> -            ovs_mutex_lock(&conn->lock);
> -            if (now < conn->expiration || count >= limit) {
> -                min_expiration = MIN(min_expiration, conn->expiration);
> -                ovs_mutex_unlock(&conn->lock);
> -                if (count >= limit) {
> -                    /* Do not check other lists. */
> -                    COVERAGE_INC(conntrack_long_cleanup);
> -                    goto out;
> -                }
> -                break;
> -            } else {
> -                ovs_mutex_unlock(&conn->lock);
> -                conn_clean(ct, conn);
> -            }
> -            count++;
> +        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +        ovs_mutex_lock(&conn->lock);
> +        expiration = conn->expiration;
> +        ovs_mutex_unlock(&conn->lock);

Atomic expiration might be worth considering as well.

> +
> +        if (now >= expiration) {
> +            conn_clean(ct, conn);
>          }
> +
> +        conn_count++;
>      }
> 
> -out:
> -    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
> -             time_msec() - now);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -    return min_expiration;
> +    return conn_count;
>  }
> 
> -/* Cleans up old connection entries from 'ct'.  Returns the time when the
> - * next expiration might happen.  The return value might be smaller than
> - * 'now', meaning that an internal limit has been reached, and some expired
> - * connections have not been deleted. */
> +/* Cleans up old connection entries from 'ct'.  Returns the the next
> + * wake up time.  The return value might be smaller than 'now', meaning
> + * that an internal limit has been reached, that is, the table
> + * hasn't been entirely scanned. */
>  static long long
>  conntrack_clean(struct conntrack *ct, long long now)
>  {
> -    unsigned int n_conn_limit;
> +    long long next_wakeup = now + 90 * 1000;
> +    unsigned int n_conn_limit, i, count = 0;
> +    size_t clean_end;
> +
>      atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
> -    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
> -    long long min_exp = ct_sweep(ct, now, clean_max);
> -    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
> +    clean_end = n_conn_limit / 64;

I don't know if the change in heuristics is better or not here.
Which metrics should be considered relevant? CPU load? Latency?

> +
> +    for (i = ct->next_bucket; i < CONNTRACK_BUCKETS; i++) {
> +        struct ct_bucket *bucket = &ct->buckets[i];
> +
> +        count += sweep_bucket(ct, bucket, now);
> +
> +        if (count > clean_end) {
> +            next_wakeup = 0;
> +            break;
> +        }
> +    }
> +
> +    ct->next_bucket = (i < CONNTRACK_BUCKETS) ? i : 0;
> 
>      return next_wakeup;
>  }
> 
>  /* Cleanup:
>   *
> - * We must call conntrack_clean() periodically.  conntrack_clean() return
> - * value gives an hint on when the next cleanup must be done (either because
> - * there is an actual connection that expires, or because a new connection
> - * might be created with the minimum timeout).
> + * We must call conntrack_clean() periodically.  conntrack_clean()
> + * return value gives an hint on when the next cleanup must be done
> + * (either because there is still work to do, or because a new
> + * connection might be created).
>   *
>   * The logic below has two goals:
>   *
> - * - We want to reduce the number of wakeups and batch connection cleanup
> - *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
> - *   are coping with the current cleanup tasks, then we wait at least
> - *   5 seconds to do further cleanup.
> + * - When the load is high, we want to avoid to hog the CPU scanning
> + *   all the buckets and their respective CMAPs "at once". For this
> + *   reason, every batch cleanup aims to scan at most n_conn_limit /
> + *   64 entries (more if the buckets contains many entrie) before
> + *   yielding the CPU. In this case, the next wake up will happen in
> + *   CT_CLEAN_MIN_INTERVAL_MS and the scan will resume starting from
> + *   the first bucket not scanned.
>   *
> - * - We don't want to keep the map locked too long, as we might prevent
> - *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
> - *   behind, there is at least some 200ms blocks of time when the map will be
> - *   left alone, so the datapath can operate unhindered.
> - */
> -#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
> -#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
> + * - We also don't want to scan the buckets so frequently, as going
> + *   through all the connections, during high loads, may be costly in
> + *   terms of CPU time. In this case the next wake up is set to 90
> + *   seconds. */
> +#define CT_CLEAN_MIN_INTERVAL_MS 100  /* 0.1 seconds */
> 
>  static void *
>  clean_thread_main(void *f_)
> @@ -1556,9 +1657,9 @@ clean_thread_main(void *f_)
>          next_wake = conntrack_clean(ct, now);
> 
>          if (next_wake < now) {
> -            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
> +            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
>          } else {
> -            poll_timer_wait_until(MAX(next_wake, now + 
> CT_CLEAN_INTERVAL));
> +            poll_timer_wait_until(next_wake);
>          }
>          latch_wait(&ct->clean_thread_exit);
>          poll_block();
> @@ -2088,6 +2189,12 @@ ct_endpoint_hash_add(uint32_t hash, const struct 
> ct_endpoint *ep)
>      return hash_add_bytes32(hash, (const uint32_t *) ep, sizeof *ep);
>  }
> 
> +static uint32_t
> +cached_key_hash(struct conn_key_node *n)
> +{
> +    return n->key_hash;
> +}
> +
>  /* Symmetric */
>  static uint32_t
>  conn_key_hash(const struct conn_key *key, uint32_t basis)
> @@ -2357,8 +2464,9 @@ next_addr_in_range_guarded(union ct_addr *curr, 
> union ct_addr *min,
>   *
>   * If none can be found, return exhaustion to the caller. */
>  static bool
> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
> -                     const struct nat_action_info_t *nat_info)
> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
> +                          const struct nat_action_info_t *nat_info,
> +                          uint32_t *rev_hash)
>  {
>      union ct_addr min_addr = {0}, max_addr = {0}, curr_addr = {0},
>                    guard_addr = {0};
> @@ -2392,10 +2500,15 @@ another_round:
>                        nat_info->nat_action);
> 
>      if (!pat_proto) {
> +        uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
> +        *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
> +
> +        buckets_lock(ct, key_hash, *rev_hash);
>          if (!conn_lookup(ct, rev_key,
>                           time_msec(), NULL, NULL)) {
>              return true;
>          }
> +        buckets_unlock(ct, key_hash, *rev_hash);
> 
>          goto next_addr;
>      }
> @@ -2404,10 +2517,15 @@ another_round:
>          rev_key->src.port = htons(curr_dport);
>          FOR_EACH_PORT_IN_RANGE(curr_sport, min_sport, max_sport) {
>              rev_key->dst.port = htons(curr_sport);
> +            uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
> +            *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
> +
> +            buckets_lock(ct, key_hash, *rev_hash);
>              if (!conn_lookup(ct, rev_key,
>                               time_msec(), NULL, NULL)) {
>                  return true;
>              }
> +            buckets_unlock(ct, key_hash, *rev_hash);
>          }
>      }
> 
> @@ -2615,20 +2733,39 @@ conntrack_dump_next(struct conntrack_dump 
> *dump, struct ct_dpif_entry *entry)
>  {
>      struct conntrack *ct = dump->ct;
>      long long now = time_msec();
> +    struct ct_bucket *bucket;
> 
> -    for (;;) {
> -        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
> -                                                       &dump->cm_pos);
> -        if (!cm_node) {
> -            break;
> +    while (dump->bucket < CONNTRACK_BUCKETS) {
> +        struct cmap_node *cm_node;
> +        bucket = &ct->buckets[dump->bucket];
> +
> +        for (;;) {
> +            cm_node = cmap_next_position(&bucket->conns,
> +                                         &dump->cm_pos);
> +            if (!cm_node) {
> +                break;
> +            }
> +            struct conn_key_node *keyn;
> +            struct conn *conn;
> +            INIT_CONTAINER(keyn, cm_node, cm_node);
> +            conn = CONTAINER_OF(keyn, struct conn, 
> key_node[keyn->key.dir]);
> +
> +            if (conn_expired(conn, now)) {
> +                /* XXX: ideally this should call conn_clean(). */
> +                continue;
> +            }
> +
> +            if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
> +                (keyn->key.dir == CT_DIR_FWD)) {
> +                conn_to_ct_dpif_entry(conn, entry, now);
> +                break;
> +            }
>          }
> -        struct conn_key_node *keyn;
> -        struct conn *conn;
> -        INIT_CONTAINER(keyn, cm_node, cm_node);
> -        conn = CONTAINER_OF(keyn, struct conn, 
> key_node[keyn->key.dir]);
> -        if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
> -            (keyn->key.dir == CT_DIR_FWD)) {
> -            conn_to_ct_dpif_entry(conn, entry, now);
> +
> +        if (!cm_node) {
> +            memset(&dump->cm_pos, 0, sizeof dump->cm_pos);
> +            dump->bucket++;
> +        } else {
>              return 0;
>          }
>      }
> @@ -2648,17 +2785,18 @@ conntrack_flush(struct conntrack *ct, const 
> uint16_t *zone)
>      struct conn_key_node *keyn;
>      struct conn *conn;
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
> -        if (keyn->key.dir != CT_DIR_FWD) {
> -            continue;
> -        }
> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> -        if (!zone || *zone == keyn->key.zone) {
> -            conn_clean(ct, conn);
> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        CMAP_FOR_EACH (keyn, cm_node, &ct->buckets[i].conns) {
> +            if (keyn->key.dir != CT_DIR_FWD) {
> +                continue;
> +            }
> +
> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +            if (!zone || *zone == keyn->key.zone) {
> +                conn_clean(ct, conn);
> +            }
>          }
>      }
> -    ovs_mutex_unlock(&ct->ct_lock);
> 
>      return 0;
>  }
> @@ -2667,15 +2805,19 @@ int
>  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
>                        uint16_t zone)
>  {
> -    int error = 0;
>      struct conn_key key;
> -    struct conn *conn;
> +    struct conn *conn = NULL;
> +    unsigned bucket;
> +    uint32_t hash;
> +    int error = 0;
> 
>      memset(&key, 0, sizeof(key));
>      tuple_to_conn_key(tuple, zone, &key);
> -    ovs_mutex_lock(&ct->ct_lock);
> -    conn_lookup(ct, &key, time_msec(), &conn, NULL);
> 
> +    hash = conn_key_hash(&key, ct->hash_basis);
> +    bucket = hash_scale(hash);
> +
> +    conn_key_lookup(ct, bucket, &key, hash, time_msec(), &conn, NULL);
>      if (conn) {
>          conn_clean(ct, conn);
>      } else {
> @@ -2683,7 +2825,6 @@ conntrack_flush_tuple(struct conntrack *ct, const 
> struct ct_dpif_tuple *tuple,
>          error = ENOENT;
>      }
> 
> -    ovs_mutex_unlock(&ct->ct_lock);
>      return error;
>  }
> 

Best regards,
Paolo Valerio Jan. 31, 2022, 10:04 p.m. UTC | #2
Hi Gaetan,

thank you for the feedback

Gaëtan Rivet <grive@u256.net> writes:

> Hi Paolo,
>
> On Mon, Nov 29, 2021, at 19:06, Paolo Valerio wrote:
>> The purpose of this commit is to split the current way of storing the
>> conn nodes. Before this patch the nodes were stored into a single cmap
>> using ct->lock to avoid concurrent write access.
>> With this commit a single connection can be stored into one or two (at
>> most) CONNTRACK_BUCKETS available based on the outcome of the function
>> hash_scale() on the key.
>
> I think this approach overall works.
> Removing the expiration list altogether might be worth it, but it would be nice
> to have data to back it up. It simplifies expiration update and removes
> another shared data structure, which is nice. However to accept the change
> we must ensure that the ct sweep is within acceptable bounds.
>
> How does the two solution compares regarding:
>
>   * expiration latency (delta between expected expiration time vs. actual).
>   * Memory consumption: depending on the structure, it's one additional node per conn.
>     For a list, that represents 2x64 bits.
>     Not that much, but that's a benefit of removing the exp list.
>   * CPU utilization / average cycles per conn: how heavy is it on the CPU to
>     iterate a CMAP vs. a linked-list?
>     CMAPs are heavy in coherency traffic due to constant atomic reads, vs. a list
>     that is mostly pointer dereference.
>
> Maybe a micro-benchmark in tests/test-conntrack.c for the aging process could help
> measure those metrics isolated from the rest of OVS.
>
> That seems quite a bit of additional work, maybe you would be able to provide relevant
> comparison without writing this additional benchmark.
> On the other hand, maybe it only consists in inserting N conns, artificially advancing
> the internal OVS clock by the timeout and measuring the behavior. I'm not sure.
>

I'll try to collect some numbers.
I'll share them once available.

> I measured the time to insert 1M connections with your patchset, my ct-scale series and
> the current baseline:
>
> n-thread\version   baseline  ct-scale  ct-bucket
>                1    831 ms    737 ms    715 ms
>                2   1177 ms    882 ms    867 ms
>                3   1524 ms   1000 ms   1012 ms
>                4   2595 ms   1128 ms   1139 ms
>
> So overall at least on insertion in conntrack_execute, it seem your solution is
> pretty much on par with the series I proposed, and both are improvements compared to current baseline.
>
> Scaling is not yet linear though, and the mutex locking still appears in profiling with
> multiple threads.
>
>> Every bucket has its local lock that needs to be acquired every time a
>> node has to be removed/inserted from/to the cmap.
>> This means that, in case the hash of the CT_DIR_FWD key differs from
>> the one of the CT_DIR_REV, we can end up having the reference of the
>> two key nodes in different buckets, and consequently acquiring two locks
>> (one per bucket).
>> This approach may be handy in different ways, depending on the way the
>> stale connection removal gets designed. The attempt of this patch is
>> to remove the expiration lists, removing the stale entries mostly in
>> two ways:
>>
>> - during the key lookup
>> - when the sweeper task wakes up
>>
>> the first case is not very strict, as we remove only expired entries
>> with the same hash. To increase its effectiveness, we should probably
>> increase the number of buckets and replace the cmaps with other data
>> structures like rcu lists.
>
> Did you try several number of buckets?
> I see 2^10 being used now, it seems a bit large to me. Does it provide tangible
> benefits? Did you measure contention vs. 512, or 256 buckets?

For the time being, I stuck with the old values of buckets entries (pre
rcu patch series).

The number of buckets though should not be too small because of the
sweeper task.
One of the tests I performed to see how the sweeper task behaves
involved syn floods. In such test, if the number of new connections per
second is relatively high, when the sweeper kicks in, having a
limited number of potential connections per CMAP could be beneficial in
terms of completing the sweeping phase without taking a lot of time for
the task to complete its whole work. Conversely, a very small number may
not be optimal.

I think the values you proposed should not be problematic. I didn't
really test extensively other values, but it's worth a try, so will do.

>
> Replacing a CMAP by N rculists seems to be re-implementing a sharded CMAP with
> one lock per internal bucket in a sense. However the bucket array would be static
> and would now resize with the CMAP.
>
> It would result in a poorly implemented hash-map with integrated
> locks and static number of buckets. It should either be possible to find a solution
> with existing structure, or a new structure should be instead cleanly designed.
>

I suppose it could be resizable in the sense that we can change the size
through a configuration knob (acquiring all the locks that should be no
more than 1024 in any case), but I agree that it would be something very
custom, and very specific for this use case. If we need to replace that,
we should probably evaluate all the options, including something
different and more cleanly designed that could fit the needs of this use
case.

>> The sweeper task instead takes charge of the remaining stale entries
>> removal. The heuristics used in the sweeper task are mostly an
>> example, but could be modified to match any possible uncovered use
>> case.
>
> Which metric did you use to choose the new heuristics for the cleanup?
>

nothing particularly refined. The idea was to avoid hogging the CPU.

> Otherwise I have a few remarks or suggestions below.
>
>>
>> Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
>> ---
>> The cover letter includes further details.
>> ---
>>  lib/conntrack-private.h |   34 +++
>>  lib/conntrack-tp.c      |   42 ----
>>  lib/conntrack.c         |  461 +++++++++++++++++++++++++++++++----------------
>>  tests/system-traffic.at |    5 -
>>  4 files changed, 331 insertions(+), 211 deletions(-)
>>
>> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
>> index ea5ba3d9e..a89ff96fa 100644
>> --- a/lib/conntrack-private.h
>> +++ b/lib/conntrack-private.h
>> @@ -95,6 +95,7 @@ struct alg_exp_node {
>> 
>>  struct conn_key_node {
>>      struct conn_key key;
>> +    uint32_t key_hash;
>
> Caching the key hash is necessary, for quick lookup and deletion
> in the cmap.
>
> Unfortunately, as you need to use hash_key_scale() to find the corresponding
> bucket, you are unable to use the OVS_ACQUIRES / OVS_RELEASES marks on buckets_{lock,unlock}(), which throws off clang's safety analysis.
>
> This safety analysis is very useful. The basic workaround would be to mark those functions
> with OVS_NO_THREAD_SAFETY_ANALYSIS but it is not a good solution as it will mask issues.
>
> Instead, a possible way is to store there the key_bucket, which will be the
> bucket index corresponding to this key_hash. Then:
>
> buckets_lock(ct, p1, p2)
>    OVS_ACQUIRES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)
>
> buckets_unlock(ct, p1, p2)
>    OVS_RELEASES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)
>
> where p1 == conn->key_node[0].key_bucket;
> (by the way, those functions might be better renamed ct_bucket_lock() I think).
>
> Not sure this solution would work, but I think it's definitely worth trying to find one.
> If you select CC=clang with ./configure, -Wthread-safety should be automatically used.
>

I agree, we should try hard to keep it.
I'll start taking a look at your suggestion.

>>      struct cmap_node cm_node;
>>  };
>> 
>> @@ -102,7 +103,6 @@ struct conn {
>>      /* Immutable data. */
>>      struct conn_key_node key_node[CT_DIR_MAX];
>>      struct conn_key parent_key; /* Only used for orig_tuple support. */
>> -    struct ovs_list exp_node;
>> 
>>      uint16_t nat_action;
>>      char *alg;
>> @@ -121,7 +121,9 @@ struct conn {
>>      /* Mutable data. */
>>      bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
>>                          * control messages; true if reply direction. */
>> -    bool cleaned; /* True if cleaned from expiry lists. */
>> +    atomic_flag cleaned; /* True if the entry was stale and one of the
>> +                          * cleaner (i.e. packet path or sweeper) took
>> +                          * charge of it. */
>> 
>>      /* Immutable data. */
>>      bool alg_related; /* True if alg data connection. */
>> @@ -192,10 +194,25 @@ enum ct_timeout {
>>      N_CT_TM
>>  };
>> 
>> -struct conntrack {
>> -    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
>> +#define CONNTRACK_BUCKETS_SHIFT 10
>> +#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
>> +
>> +struct ct_bucket {
>> +    /* Protects 'conns'. In case of natted conns, there's a high
>> +     * chance that the forward and the reverse key stand in different
>> +     * buckets. buckets_lock() should be the preferred way to acquire
>> +     * these locks (unless otherwise needed), as it deals with the
>> +     * acquisition order. */
>> +    struct ovs_mutex lock;
>> +    /* Contains the connections in the bucket, indexed by
>> +     * 'struct conn_key'. */
>>      struct cmap conns OVS_GUARDED;
>> -    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
>> +};
>> +
>> +struct conntrack {
>> +    struct ct_bucket buckets[CONNTRACK_BUCKETS];
>> +    unsigned int next_bucket;
>> +    struct ovs_mutex ct_lock;
>>      struct cmap zone_limits OVS_GUARDED;
>>      struct cmap timeout_policies OVS_GUARDED;
>>      uint32_t hash_basis; /* Salt for hashing a connection key. */
>> @@ -220,9 +237,10 @@ struct conntrack {
>>  };
>> 
>>  /* Lock acquisition order:
>> - *    1. 'ct_lock'
>> - *    2. 'conn->lock'
>> - *    3. 'resources_lock'
>> + *    1. 'buckets[p1]->lock'
>> + *    2  'buckets[p2]->lock' (with p1 < p2)
>> + *    3. 'conn->lock'
>> + *    4. 'resources_lock'
>>   */
>> 
>>  extern struct ct_l4_proto ct_proto_tcp;
>> diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
>> index 9ecb06978..117810528 100644
>> --- a/lib/conntrack-tp.c
>> +++ b/lib/conntrack-tp.c
>> @@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
>>      return CT_DPIF_TP_ATTR_MAX;
>>  }
>> 
>> -static void
>> -conn_update_expiration__(struct conntrack *ct, struct conn *conn,
>> -                         enum ct_timeout tm, long long now,
>> -                         uint32_t tp_value)
>> -    OVS_REQUIRES(conn->lock)
>> -{
>> -    ovs_mutex_unlock(&conn->lock);
>> -
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    ovs_mutex_lock(&conn->lock);
>> -    if (!conn->cleaned) {
>> -        conn->expiration = now + tp_value * 1000;
>> -        ovs_list_remove(&conn->exp_node);
>> -        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>> -    }
>> -    ovs_mutex_unlock(&conn->lock);
>> -    ovs_mutex_unlock(&ct->ct_lock);
>> -
>> -    ovs_mutex_lock(&conn->lock);
>> -}
>> -
>>  /* The conn entry lock must be held on entry and exit. */
>>  void
>>  conn_update_expiration(struct conntrack *ct, struct conn *conn,
>> @@ -266,42 +245,25 @@ conn_update_expiration(struct conntrack *ct, 
>> struct conn *conn,
>>      struct timeout_policy *tp;
>>      uint32_t val;
>> 
>> -    ovs_mutex_unlock(&conn->lock);
>> -
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    ovs_mutex_lock(&conn->lock);
>>      tp = timeout_policy_lookup(ct, conn->tp_id);
>>      if (tp) {
>>          val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
>>      } else {
>>          val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
>>      }
>> -    ovs_mutex_unlock(&conn->lock);
>> -    ovs_mutex_unlock(&ct->ct_lock);
>> 
>> -    ovs_mutex_lock(&conn->lock);
>>      VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
>>                  "val=%u sec.",
>>                  ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
>>                  conn->tp_id, val);
>> 
>> -    conn_update_expiration__(ct, conn, tm, now, val);
>> -}
>> -
>> -static void
>> -conn_init_expiration__(struct conntrack *ct, struct conn *conn,
>> -                       enum ct_timeout tm, long long now,
>> -                       uint32_t tp_value)
>> -{
>> -    conn->expiration = now + tp_value * 1000;
>> -    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>> +    conn->expiration = now + val * 1000;
>>  }
>> 
>>  /* ct_lock must be held. */
>>  void
>>  conn_init_expiration(struct conntrack *ct, struct conn *conn,
>>                       enum ct_timeout tm, long long now)
>> -    OVS_REQUIRES(ct->ct_lock)
>>  {
>>      struct timeout_policy *tp;
>>      uint32_t val;
>> @@ -317,5 +279,5 @@ conn_init_expiration(struct conntrack *ct, struct 
>> conn *conn,
>>                  ct_timeout_str[tm], 
>> conn->key_node[CT_DIR_FWD].key.zone,
>>                  conn->tp_id, val);
>> 
>> -    conn_init_expiration__(ct, conn, tm, now, val);
>> +    conn->expiration = now + val * 1000;
>>  }
>> diff --git a/lib/conntrack.c b/lib/conntrack.c
>> index a284c57c0..1c019af29 100644
>> --- a/lib/conntrack.c
>> +++ b/lib/conntrack.c
>
> At the beginning of the file, there is a remaining reference to a stats
> that you otherwise removed:
>
> @@ -44,7 +44,6 @@
>  VLOG_DEFINE_THIS_MODULE(conntrack);
>
>  COVERAGE_DEFINE(conntrack_full);
> -COVERAGE_DEFINE(conntrack_long_cleanup);
>  COVERAGE_DEFINE(conntrack_l3csum_err);
>  COVERAGE_DEFINE(conntrack_l4csum_err);
>  COVERAGE_DEFINE(conntrack_lookup_natted_miss);
> @@ -560,6 +559,7 @@ conn_out_found:
>

good catch

>> @@ -85,9 +85,12 @@ struct zone_limit {
>>      struct conntrack_zone_limit czl;
>>  };
>> 
>> +static unsigned hash_scale(uint32_t hash);
>> +static void conn_clean(struct conntrack *ct, struct conn *conn);
>>  static bool conn_key_extract(struct conntrack *, struct dp_packet *,
>>                               ovs_be16 dl_type, struct conn_lookup_ctx *,
>>                               uint16_t zone);
>> +static uint32_t cached_key_hash(struct conn_key_node *n);
>
> This function made me think something was being computed,
> while it's only reading the key_node field. I think it's worth
> just directly accessing it.
>

ACK

>>  static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
>>  static void conn_key_reverse(struct conn_key *);
>>  static bool valid_new(struct dp_packet *pkt, struct conn_key *);
>> @@ -109,8 +112,9 @@ static void set_label(struct dp_packet *, struct conn *,
>>  static void *clean_thread_main(void *f_);
>> 
>>  static bool
>> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
>> -                     const struct nat_action_info_t *nat_info);
>> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
>> +                          const struct nat_action_info_t *nat_info,
>> +                          uint32_t *rev_hash);
>> 
>>  static uint8_t
>>  reverse_icmp_type(uint8_t type);
>> @@ -249,16 +253,17 @@ conntrack_init(void)
>>      ovs_rwlock_unlock(&ct->resources_lock);
>> 
>>      ovs_mutex_init_adaptive(&ct->ct_lock);
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    cmap_init(&ct->conns);
>> -    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
>> -        ovs_list_init(&ct->exp_lists[i]);
>> +
>> +    ct->next_bucket = 0;
>> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
>> +        struct ct_bucket *bucket = &ct->buckets[i];
>> +        cmap_init(&bucket->conns);
>> +        ovs_mutex_init_recursive(&bucket->lock);
>
> Does it need to be recursive? How does it happen?
> I think this mutex could be made adaptive, it might be worth profiling contention
> and checking if it helps or not.
>

There's one case:

nat_get_unique_tuple_lock() performs a locked lookup which in turn may
end up calling conn_clean() that locks again.

>>      }
>> +
>>      cmap_init(&ct->zone_limits);
>>      ct->zone_limit_seq = 0;
>>      timeout_policy_init(ct);
>> -    ovs_mutex_unlock(&ct->ct_lock);
>> -
>>      atomic_count_init(&ct->n_conn, 0);
>>      atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>>      atomic_init(&ct->tcp_seq_chk, true);
>> @@ -410,9 +415,9 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
>>  }
>> 
>>  static void
>> -conn_clean(struct conntrack *ct, struct conn *conn)
>> -    OVS_REQUIRES(ct->ct_lock)
>> +conn_clean__(struct conntrack *ct, struct conn *conn)
>>  {
>> +    struct ct_bucket *bucket;
>>      struct zone_limit *zl;
>>      uint32_t hash;
>> 
>> @@ -420,8 +425,9 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>>          expectation_clean(ct, &conn->key_node[CT_DIR_FWD].key);
>>      }
>> 
>> -    hash = conn_key_hash(&conn->key_node[CT_DIR_FWD].key, ct->hash_basis);
>> -    cmap_remove(&ct->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
>> +    hash = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
>> +    bucket = &ct->buckets[hash_scale(hash)];
>> +    cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
>> 
>>      zl = zone_limit_lookup(ct, conn->admit_zone);
>>      if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
>> @@ -429,12 +435,10 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>>      }
>> 
>>      if (conn->nat_action) {
>> -        hash = conn_key_hash(&conn->key_node[CT_DIR_REV].key,
>> -                             ct->hash_basis);
>> -        cmap_remove(&ct->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
>> +        hash = cached_key_hash(&conn->key_node[CT_DIR_REV]);
>> +        bucket = &ct->buckets[hash_scale(hash)];
>> +        cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
>>      }
>> -    ovs_list_remove(&conn->exp_node);
>> -    conn->cleaned = true;
>>      ovsrcu_postpone(delete_conn, conn);
>>      atomic_count_dec(&ct->n_conn);
>>  }
>> @@ -446,22 +450,35 @@ void
>>  conntrack_destroy(struct conntrack *ct)
>>  {
>>      struct conn_key_node *keyn;
>> +    struct ct_bucket *bucket;
>>      struct conn *conn;
>> +    int i;
>> 
>>      latch_set(&ct->clean_thread_exit);
>>      pthread_join(ct->clean_thread, NULL);
>>      latch_destroy(&ct->clean_thread_exit);
>> 
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
>> -        if (keyn->key.dir != CT_DIR_FWD) {
>> -            continue;
>> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>> +        bucket = &ct->buckets[i];
>> +        CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
>> +            if (keyn->key.dir != CT_DIR_FWD) {
>> +                continue;
>> +            }
>> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> +            conn_clean(ct, conn);
>>          }
>> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> -        conn_clean(ct, conn);
>>      }
>> -    cmap_destroy(&ct->conns);
>> 
>> +    /* XXX: we need this loop because connections may be in multiple
>> +     * buckets.  The former loop should probably use conn_clean__()
>> +     * or an unlocked version of conn_clean(). */
>> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>> +        bucket = &ct->buckets[i];
>> +        ovs_mutex_destroy(&bucket->lock);
>> +        cmap_destroy(&ct->buckets[i].conns);
>> +    }
>> +
>> +    ovs_mutex_lock(&ct->ct_lock);
>>      struct zone_limit *zl;
>>      CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
>>          uint32_t hash = zone_key_hash(zl->czl.zone, ct->hash_basis);
>> @@ -498,45 +515,108 @@ conntrack_destroy(struct conntrack *ct)
>>  }
>> 
>> 
>> +static unsigned hash_scale(uint32_t hash)
>> +{
>> +    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
>> +}
>> +
>
> I am not sure I understand this.
>
> You have a hash of width 32 bits.
> You shift right by 22 bits (10 bits remaining).
> Then you use modulo with a power of two, effectively doing bitwise & on the
> 10 LSBs, which seems redundant at this point.
>
> Is there a reason to prefer the MSBs to the LSBs? Is there a specific property of the hash function that requires it?
>

well, this is also something I resumed from the old bucket
implementation, with the idea to replace it in case it was needed to be
improved, but from my tests, it seemed ok.
I agree the modulo seems redundant, also considering it is bound to the
BUCKETS_SHIFT. I overlooked it while resuming the piece of code, but we
can remove it.

I found the comment that describes the reason of MSBs in the patch that
introduced it:

/* Extracts the most significant bits in hash. The least significant bits
 * are already used internally by the hmap implementation. */

>>  static bool
>> -conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
>> -                uint32_t hash, long long now, struct conn **conn_out,
>> +conn_key_lookup(struct conntrack *ct, unsigned bucket,
>> +                const struct conn_key *key, uint32_t hash,
>> +                long long now, struct conn **conn_out,
>>                  bool *reply)
>>  {
>> +    struct ct_bucket *ctb = &ct->buckets[bucket];
>>      struct conn_key_node *keyn;
>> -    struct conn *conn = NULL;
>>      bool found = false;
>> +    struct conn *conn;
>> 
>> -    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ct->conns) {
>> +    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ctb->conns) {
>>          conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> +        if (conn_expired(conn, now)) {
>> +            conn_clean(ct, conn);
>> +            continue;
>> +        }
>> +
>>          for (int i = CT_DIR_FWD; i < CT_DIR_MAX; i++) {
>> -            if (!conn_key_cmp(&conn->key_node[i].key, key) &&
>> -                !conn_expired(conn, now)) {
>> +            if (!conn_key_cmp(&conn->key_node[i].key, key)) {
>>                  found = true;
>>                  if (reply) {
>>                      *reply = i;
>>                  }
>> -                goto out_found;
>> +
>> +                goto conn_out_found;
>>              }
>>          }
>>      }
>> 
>> -out_found:
>> -    if (found && conn_out) {
>> -        *conn_out = conn;
>> -    } else if (conn_out) {
>> -        *conn_out = NULL;
>> +conn_out_found:
>> +    if (conn_out) {
>> +        *conn_out = found ? conn : NULL;
>>      }
>> 
>>      return found;
>>  }
>> 
>> +static void
>> +buckets_unlock(struct conntrack *ct, uint32_t h1, uint32_t h2)
>
> ct_buckets_unlock?

ACK

>
>> +{
>> +    unsigned p1 = hash_scale(h1),
>> +        p2 = hash_scale(h2);
>> +
>> +    if (p1 > p2) {
>
> Keeping the comparison the same as in buckets_lock permits to see at a glance
> that the locking order is the reverse of their precedence for unlock, which
> IMO helps seeing that it is correctly done.
>
> So I think (p1 < p2) there and then swapping p1 and p2 below would be easier to read.
>

ACK

>> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
>> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
>> +    } else if (p1 < p2) {
>> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
>> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
>> +    } else {
>> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
>> +    }
>> +}
>> +
>> +/* Acquires both locks in an ordered way. */
>> +static void
>> +buckets_lock(struct conntrack *ct, uint32_t h1, uint32_t h2)
>> +{
>> +    unsigned p1 = hash_scale(h1),
>> +        p2 = hash_scale(h2);
>> +
>> +    if (p1 < p2) {
>> +        ovs_mutex_lock(&ct->buckets[p1].lock);
>> +        ovs_mutex_lock(&ct->buckets[p2].lock);
>> +    } else if (p1 > p2) {
>> +        ovs_mutex_lock(&ct->buckets[p2].lock);
>> +        ovs_mutex_lock(&ct->buckets[p1].lock);
>> +    } else {
>> +        ovs_mutex_lock(&ct->buckets[p1].lock);
>> +    }
>> +}
>> +
>> +static void
>> +conn_clean(struct conntrack *ct, struct conn *conn)
>> +{
>> +    uint32_t h1, h2;
>> +
>> +    if (atomic_flag_test_and_set(&conn->cleaned)) {
>> +        return;
>> +    }
>> +
>> +    h1 = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
>> +    h2 = cached_key_hash(&conn->key_node[CT_DIR_REV]);
>> +    buckets_lock(ct, h1, h2);
>> +    conn_clean__(ct, conn);
>> +    buckets_unlock(ct, h1, h2);
>> +}
>> +
>>  static bool
>>  conn_lookup(struct conntrack *ct, const struct conn_key *key,
>>              long long now, struct conn **conn_out, bool *reply)
>>  {
>>      uint32_t hash = conn_key_hash(key, ct->hash_basis);
>> -    return conn_key_lookup(ct, key, hash, now, conn_out, reply);
>> +    unsigned bucket = hash_scale(hash);
>> +
>> +    return conn_key_lookup(ct, bucket, key, hash, now, conn_out, reply);
>>  }
>> 
>>  static void
>> @@ -944,7 +1024,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>                 const struct nat_action_info_t *nat_action_info,
>>                 const char *helper, const struct alg_exp_node *alg_exp,
>>                 enum ct_alg_ctl_type ct_alg_ctl, uint32_t tp_id)
>> -    OVS_REQUIRES(ct->ct_lock)
>>  {
>>      struct conn *nc = NULL;
>>      uint32_t rev_hash = ctx->hash;
>> @@ -954,6 +1033,8 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>          return nc;
>>      }
>> 
>> +    /* XXX: We are unlocked here, so we don't know
>> +     * if the tuple already exists in the table. */
>
> Can you expand more on the issue here?
> Shouldn't we be locked for the lookup?
>

We should, I think that's a leftover. I'm going to remove it

>>      pkt->md.ct_state = CS_NEW;
>> 
>>      if (alg_exp) {
>> @@ -961,10 +1042,11 @@ conn_not_found(struct conntrack *ct, struct 
>> dp_packet *pkt,
>>      }
>> 
>>      if (commit) {
>> -        struct conn_key_node *fwd_key_node, *rev_key_node;
>> -
>>          struct zone_limit *zl = zone_limit_lookup_or_default(ct,
>>                                                               
>> ctx->key.zone);
>> +        struct conn_key_node *fwd_key_node, *rev_key_node;
>> +        bool handle_tuple = false;
>> +
>>          if (zl && atomic_count_get(&zl->czl.count) >= zl->czl.limit) {
>>              return nc;
>>          }
>> @@ -1007,22 +1089,40 @@ conn_not_found(struct conntrack *ct, struct 
>> dp_packet *pkt,
>>                      nc->nat_action = NAT_ACTION_DST;
>>                  }
>>              } else {
>> -                bool nat_res = nat_get_unique_tuple(ct, nc, 
>> nat_action_info);
>> -
>> -                if (!nat_res) {
>> -                    goto nat_res_exhaustion;
>> -                }
>> +                handle_tuple = true;
>>              }
>> -
>> -            nat_packet(pkt, nc, ctx->icmp_related);
>> -            rev_hash = conn_key_hash(&rev_key_node->key, 
>> ct->hash_basis);
>> -            rev_key_node->key.dir = CT_DIR_REV;
>> -            cmap_insert(&ct->conns, &rev_key_node->cm_node, rev_hash);
>>          }
>> 
>>          ovs_mutex_init_adaptive(&nc->lock);
>> +        atomic_flag_clear(&nc->cleaned);
>>          fwd_key_node->key.dir = CT_DIR_FWD;
>> -        cmap_insert(&ct->conns, &fwd_key_node->cm_node, ctx->hash);
>> +        rev_key_node->key.dir = CT_DIR_REV;
>> +
>> +        if (handle_tuple) {
>> +            bool nat_res = nat_get_unique_tuple_lock(ct, nc, nat_action_info,
>> +                                                     &rev_hash);
>> +
>> +            if (!nat_res) {
>> +                goto out_error;
>> +            }
>> +        } else {
>> +            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
>> +            buckets_lock(ct, ctx->hash, rev_hash);
>> +        }
>> +
>> +        if (conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
>> +            goto out_error_unlock;
>> +        }
>> +
>> +        fwd_key_node->key_hash = ctx->hash;
>> +        cmap_insert(&ct->buckets[hash_scale(ctx->hash)].conns,
>> +                    &fwd_key_node->cm_node, ctx->hash);
>> +        if (nat_action_info) {
>> +            rev_key_node->key_hash = rev_hash;
>> +            cmap_insert(&ct->buckets[hash_scale(rev_hash)].conns,
>> +                        &rev_key_node->cm_node, rev_hash);
>> +            nat_packet(pkt, nc, ctx->icmp_related);
>> +        }
>> 
>>          atomic_count_inc(&ct->n_conn);
>>          ctx->conn = nc; /* For completeness. */
>> @@ -1033,21 +1133,23 @@ conn_not_found(struct conntrack *ct, struct 
>> dp_packet *pkt,
>>          } else {
>>              nc->admit_zone = INVALID_ZONE;
>>          }
>> +        buckets_unlock(ct, ctx->hash, rev_hash);
>>      }
>> 
>>      return nc;
>> 
>> +out_error_unlock:
>> +    buckets_unlock(ct, ctx->hash, rev_hash);
>>      /* This would be a user error or a DOS attack.  A user error is prevented
>>       * by allocating enough combinations of NAT addresses when combined with
>>       * ephemeral ports.  A DOS attack should be protected against with
>>       * firewall rules or a separate firewall.  Also using zone partitioning
>>       * can limit DoS impact. */
>> -nat_res_exhaustion:
>> -    ovs_list_remove(&nc->exp_node);
>> +out_error:
>> +    ovs_mutex_destroy(&nc->lock);
>>      delete_conn_cmn(nc);
>>      static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>> -    VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
>> -                 "if DoS attack, use firewalling and/or zone partitioning.");
>> +    VLOG_WARN_RL(&rl, "Unable to insert a new connection.");
>>      return NULL;
>>  }
>> 
>> @@ -1082,12 +1184,7 @@ conn_update_state(struct conntrack *ct, struct 
>> dp_packet *pkt,
>>              pkt->md.ct_state = CS_INVALID;
>>              break;
>>          case CT_UPDATE_NEW:
>> -            ovs_mutex_lock(&ct->ct_lock);
>> -            if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
>> -                            now, NULL, NULL)) {
>> -                conn_clean(ct, conn);
>> -            }
>> -            ovs_mutex_unlock(&ct->ct_lock);
>> +            conn_clean(ct, conn);
>>              create_new_conn = true;
>>              break;
>>          case CT_UPDATE_VALID_NEW:
>> @@ -1253,6 +1350,8 @@ static void
>>  initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
>>                      long long now, bool natted)
>>  {
>> +    unsigned bucket = hash_scale(ctx->hash);
>> +
>>      if (natted) {
>>          /* If the packet has been already natted (e.g. a previous
>>           * action took place), retrieve it performing a lookup of its
>> @@ -1260,7 +1359,8 @@ initial_conn_lookup(struct conntrack *ct, struct 
>> conn_lookup_ctx *ctx,
>>          conn_key_reverse(&ctx->key);
>>      }
>> 
>> -    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
>> +    conn_key_lookup(ct, bucket, &ctx->key, ctx->hash,
>> +                    now, &ctx->conn, &ctx->reply);
>> 
>>      if (natted) {
>>          if (OVS_LIKELY(ctx->conn)) {
>> @@ -1287,24 +1387,20 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>              ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper,
>>              uint32_t tp_id)
>>  {
>> +    bool create_new_conn = false;
>> +
>>      /* Reset ct_state whenever entering a new zone. */
>>      if (pkt->md.ct_state && pkt->md.ct_zone != zone) {
>>          pkt->md.ct_state = 0;
>>      }
>> 
>> -    bool create_new_conn = false;
>>      initial_conn_lookup(ct, ctx, now, !!(pkt->md.ct_state &
>>                                           (CS_SRC_NAT | CS_DST_NAT)));
>>      struct conn *conn = ctx->conn;
>> 
>>      /* Delete found entry if in wrong direction. 'force' implies commit. */
>>      if (OVS_UNLIKELY(force && ctx->reply && conn)) {
>> -        ovs_mutex_lock(&ct->ct_lock);
>> -        if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
>> -                        now, NULL, NULL)) {
>> -            conn_clean(ct, conn);
>> -        }
>> -        ovs_mutex_unlock(&ct->ct_lock);
>> +        conn_clean(ct, conn);
>>          conn = NULL;
>>      }
>> 
>> @@ -1338,7 +1434,6 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>      struct alg_exp_node alg_exp_entry;
>> 
>>      if (OVS_UNLIKELY(create_new_conn)) {
>> -
>>          ovs_rwlock_rdlock(&ct->resources_lock);
>>          alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
>>                                       ct->hash_basis,
>> @@ -1349,12 +1444,9 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>          }
>>          ovs_rwlock_unlock(&ct->resources_lock);
>> 
>> -        ovs_mutex_lock(&ct->ct_lock);
>> -        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
>> -            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
>> -                                  helper, alg_exp, ct_alg_ctl, tp_id);
>> -        }
>> -        ovs_mutex_unlock(&ct->ct_lock);
>> +        conn = conn_not_found(ct, pkt, ctx, commit, now,
>> +                              nat_action_info, helper, alg_exp,
>> +                              ct_alg_ctl, tp_id);
>>      }
>> 
>>      write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
>> @@ -1467,83 +1559,92 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>>  }
>> 
>> 
>> -/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
>> - * earliest expiration time among the remaining connections in 'ctb'.  Returns
>> - * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
>> - * if 'limit' is reached */
>> +/* Delete the expired connections from 'bucket', up to 'limit'.
>> + * Returns the earliest expiration time among the remaining
>> + * connections in 'bucket'.  Returns LLONG_MAX if 'bucket' is empty.
>> + * The return value might be smaller than 'now', if 'limit' is
>> + * reached. */
>>  static long long
>> -ct_sweep(struct conntrack *ct, long long now, size_t limit)
>> +sweep_bucket(struct conntrack *ct, struct ct_bucket *bucket,
>> +             long long now)
>>  {
>> -    struct conn *conn, *next;
>> -    long long min_expiration = LLONG_MAX;
>> -    size_t count = 0;
>> +    struct conn_key_node *keyn;
>> +    unsigned int conn_count = 0;
>> +    struct conn *conn;
>> +    long long expiration;
>> 
>> -    ovs_mutex_lock(&ct->ct_lock);
>> +    CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
>> +        if (keyn->key.dir != CT_DIR_FWD) {
>> +            continue;
>> +        }
>> 
>> -    for (unsigned i = 0; i < N_CT_TM; i++) {
>> -        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
>> -            ovs_mutex_lock(&conn->lock);
>> -            if (now < conn->expiration || count >= limit) {
>> -                min_expiration = MIN(min_expiration, conn->expiration);
>> -                ovs_mutex_unlock(&conn->lock);
>> -                if (count >= limit) {
>> -                    /* Do not check other lists. */
>> -                    COVERAGE_INC(conntrack_long_cleanup);
>> -                    goto out;
>> -                }
>> -                break;
>> -            } else {
>> -                ovs_mutex_unlock(&conn->lock);
>> -                conn_clean(ct, conn);
>> -            }
>> -            count++;
>> +        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> +        ovs_mutex_lock(&conn->lock);
>> +        expiration = conn->expiration;
>> +        ovs_mutex_unlock(&conn->lock);
>
> Atomic expiration might be worth considering as well.
>

yes, I agree

>> +
>> +        if (now >= expiration) {
>> +            conn_clean(ct, conn);
>>          }
>> +
>> +        conn_count++;
>>      }
>> 
>> -out:
>> -    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
>> -             time_msec() - now);
>> -    ovs_mutex_unlock(&ct->ct_lock);
>> -    return min_expiration;
>> +    return conn_count;
>>  }
>> 
>> -/* Cleans up old connection entries from 'ct'.  Returns the time when the
>> - * next expiration might happen.  The return value might be smaller than
>> - * 'now', meaning that an internal limit has been reached, and some expired
>> - * connections have not been deleted. */
>> +/* Cleans up old connection entries from 'ct'.  Returns the the next
>> + * wake up time.  The return value might be smaller than 'now', meaning
>> + * that an internal limit has been reached, that is, the table
>> + * hasn't been entirely scanned. */
>>  static long long
>>  conntrack_clean(struct conntrack *ct, long long now)
>>  {
>> -    unsigned int n_conn_limit;
>> +    long long next_wakeup = now + 90 * 1000;
>> +    unsigned int n_conn_limit, i, count = 0;
>> +    size_t clean_end;
>> +
>>      atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
>> -    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
>> -    long long min_exp = ct_sweep(ct, now, clean_max);
>> -    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
>> +    clean_end = n_conn_limit / 64;
>
> I don't know if the change in heuristics is better or not here.
> Which metrics should be considered relevant? CPU load? Latency?
>

The idea was CPU load. This part is particularly open to any
suggestions/discussions, as I consider it the most RFC part of the
patch.

>> +
>> +    for (i = ct->next_bucket; i < CONNTRACK_BUCKETS; i++) {
>> +        struct ct_bucket *bucket = &ct->buckets[i];
>> +
>> +        count += sweep_bucket(ct, bucket, now);
>> +
>> +        if (count > clean_end) {
>> +            next_wakeup = 0;
>> +            break;
>> +        }
>> +    }
>> +
>> +    ct->next_bucket = (i < CONNTRACK_BUCKETS) ? i : 0;
>> 
>>      return next_wakeup;
>>  }
>> 
>>  /* Cleanup:
>>   *
>> - * We must call conntrack_clean() periodically.  conntrack_clean() return
>> - * value gives an hint on when the next cleanup must be done (either because
>> - * there is an actual connection that expires, or because a new connection
>> - * might be created with the minimum timeout).
>> + * We must call conntrack_clean() periodically.  conntrack_clean()
>> + * return value gives an hint on when the next cleanup must be done
>> + * (either because there is still work to do, or because a new
>> + * connection might be created).
>>   *
>>   * The logic below has two goals:
>>   *
>> - * - We want to reduce the number of wakeups and batch connection cleanup
>> - *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
>> - *   are coping with the current cleanup tasks, then we wait at least
>> - *   5 seconds to do further cleanup.
>> + * - When the load is high, we want to avoid to hog the CPU scanning
>> + *   all the buckets and their respective CMAPs "at once". For this
>> + *   reason, every batch cleanup aims to scan at most n_conn_limit /
>> + *   64 entries (more if the buckets contains many entrie) before
>> + *   yielding the CPU. In this case, the next wake up will happen in
>> + *   CT_CLEAN_MIN_INTERVAL_MS and the scan will resume starting from
>> + *   the first bucket not scanned.
>>   *
>> - * - We don't want to keep the map locked too long, as we might prevent
>> - *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
>> - *   behind, there is at least some 200ms blocks of time when the map will be
>> - *   left alone, so the datapath can operate unhindered.
>> - */
>> -#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
>> -#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
>> + * - We also don't want to scan the buckets so frequently, as going
>> + *   through all the connections, during high loads, may be costly in
>> + *   terms of CPU time. In this case the next wake up is set to 90
>> + *   seconds. */
>> +#define CT_CLEAN_MIN_INTERVAL_MS 100  /* 0.1 seconds */
>> 
>>  static void *
>>  clean_thread_main(void *f_)
>> @@ -1556,9 +1657,9 @@ clean_thread_main(void *f_)
>>          next_wake = conntrack_clean(ct, now);
>> 
>>          if (next_wake < now) {
>> -            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
>> +            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
>>          } else {
>> -            poll_timer_wait_until(MAX(next_wake, now + 
>> CT_CLEAN_INTERVAL));
>> +            poll_timer_wait_until(next_wake);
>>          }
>>          latch_wait(&ct->clean_thread_exit);
>>          poll_block();
>> @@ -2088,6 +2189,12 @@ ct_endpoint_hash_add(uint32_t hash, const struct 
>> ct_endpoint *ep)
>>      return hash_add_bytes32(hash, (const uint32_t *) ep, sizeof *ep);
>>  }
>> 
>> +static uint32_t
>> +cached_key_hash(struct conn_key_node *n)
>> +{
>> +    return n->key_hash;
>> +}
>> +
>>  /* Symmetric */
>>  static uint32_t
>>  conn_key_hash(const struct conn_key *key, uint32_t basis)
>> @@ -2357,8 +2464,9 @@ next_addr_in_range_guarded(union ct_addr *curr, 
>> union ct_addr *min,
>>   *
>>   * If none can be found, return exhaustion to the caller. */
>>  static bool
>> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
>> -                     const struct nat_action_info_t *nat_info)
>> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
>> +                          const struct nat_action_info_t *nat_info,
>> +                          uint32_t *rev_hash)
>>  {
>>      union ct_addr min_addr = {0}, max_addr = {0}, curr_addr = {0},
>>                    guard_addr = {0};
>> @@ -2392,10 +2500,15 @@ another_round:
>>                        nat_info->nat_action);
>> 
>>      if (!pat_proto) {
>> +        uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
>> +        *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
>> +
>> +        buckets_lock(ct, key_hash, *rev_hash);
>>          if (!conn_lookup(ct, rev_key,
>>                           time_msec(), NULL, NULL)) {
>>              return true;
>>          }
>> +        buckets_unlock(ct, key_hash, *rev_hash);
>> 
>>          goto next_addr;
>>      }
>> @@ -2404,10 +2517,15 @@ another_round:
>>          rev_key->src.port = htons(curr_dport);
>>          FOR_EACH_PORT_IN_RANGE(curr_sport, min_sport, max_sport) {
>>              rev_key->dst.port = htons(curr_sport);
>> +            uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
>> +            *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
>> +
>> +            buckets_lock(ct, key_hash, *rev_hash);
>>              if (!conn_lookup(ct, rev_key,
>>                               time_msec(), NULL, NULL)) {
>>                  return true;
>>              }
>> +            buckets_unlock(ct, key_hash, *rev_hash);
>>          }
>>      }
>> 
>> @@ -2615,20 +2733,39 @@ conntrack_dump_next(struct conntrack_dump 
>> *dump, struct ct_dpif_entry *entry)
>>  {
>>      struct conntrack *ct = dump->ct;
>>      long long now = time_msec();
>> +    struct ct_bucket *bucket;
>> 
>> -    for (;;) {
>> -        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
>> -                                                       &dump->cm_pos);
>> -        if (!cm_node) {
>> -            break;
>> +    while (dump->bucket < CONNTRACK_BUCKETS) {
>> +        struct cmap_node *cm_node;
>> +        bucket = &ct->buckets[dump->bucket];
>> +
>> +        for (;;) {
>> +            cm_node = cmap_next_position(&bucket->conns,
>> +                                         &dump->cm_pos);
>> +            if (!cm_node) {
>> +                break;
>> +            }
>> +            struct conn_key_node *keyn;
>> +            struct conn *conn;
>> +            INIT_CONTAINER(keyn, cm_node, cm_node);
>> +            conn = CONTAINER_OF(keyn, struct conn, 
>> key_node[keyn->key.dir]);
>> +
>> +            if (conn_expired(conn, now)) {
>> +                /* XXX: ideally this should call conn_clean(). */
>> +                continue;
>> +            }
>> +
>> +            if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
>> +                (keyn->key.dir == CT_DIR_FWD)) {
>> +                conn_to_ct_dpif_entry(conn, entry, now);
>> +                break;
>> +            }
>>          }
>> -        struct conn_key_node *keyn;
>> -        struct conn *conn;
>> -        INIT_CONTAINER(keyn, cm_node, cm_node);
>> -        conn = CONTAINER_OF(keyn, struct conn, 
>> key_node[keyn->key.dir]);
>> -        if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
>> -            (keyn->key.dir == CT_DIR_FWD)) {
>> -            conn_to_ct_dpif_entry(conn, entry, now);
>> +
>> +        if (!cm_node) {
>> +            memset(&dump->cm_pos, 0, sizeof dump->cm_pos);
>> +            dump->bucket++;
>> +        } else {
>>              return 0;
>>          }
>>      }
>> @@ -2648,17 +2785,18 @@ conntrack_flush(struct conntrack *ct, const 
>> uint16_t *zone)
>>      struct conn_key_node *keyn;
>>      struct conn *conn;
>> 
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
>> -        if (keyn->key.dir != CT_DIR_FWD) {
>> -            continue;
>> -        }
>> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> -        if (!zone || *zone == keyn->key.zone) {
>> -            conn_clean(ct, conn);
>> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
>> +        CMAP_FOR_EACH (keyn, cm_node, &ct->buckets[i].conns) {
>> +            if (keyn->key.dir != CT_DIR_FWD) {
>> +                continue;
>> +            }
>> +
>> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
>> +            if (!zone || *zone == keyn->key.zone) {
>> +                conn_clean(ct, conn);
>> +            }
>>          }
>>      }
>> -    ovs_mutex_unlock(&ct->ct_lock);
>> 
>>      return 0;
>>  }
>> @@ -2667,15 +2805,19 @@ int
>>  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
>>                        uint16_t zone)
>>  {
>> -    int error = 0;
>>      struct conn_key key;
>> -    struct conn *conn;
>> +    struct conn *conn = NULL;
>> +    unsigned bucket;
>> +    uint32_t hash;
>> +    int error = 0;
>> 
>>      memset(&key, 0, sizeof(key));
>>      tuple_to_conn_key(tuple, zone, &key);
>> -    ovs_mutex_lock(&ct->ct_lock);
>> -    conn_lookup(ct, &key, time_msec(), &conn, NULL);
>> 
>> +    hash = conn_key_hash(&key, ct->hash_basis);
>> +    bucket = hash_scale(hash);
>> +
>> +    conn_key_lookup(ct, bucket, &key, hash, time_msec(), &conn, NULL);
>>      if (conn) {
>>          conn_clean(ct, conn);
>>      } else {
>> @@ -2683,7 +2825,6 @@ conntrack_flush_tuple(struct conntrack *ct, const 
>> struct ct_dpif_tuple *tuple,
>>          error = ENOENT;
>>      }
>> 
>> -    ovs_mutex_unlock(&ct->ct_lock);
>>      return error;
>>  }
>> 
>
> Best regards,
> -- 
> Gaetan Rivet
diff mbox series

Patch

diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index ea5ba3d9e..a89ff96fa 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -95,6 +95,7 @@  struct alg_exp_node {
 
 struct conn_key_node {
     struct conn_key key;
+    uint32_t key_hash;
     struct cmap_node cm_node;
 };
 
@@ -102,7 +103,6 @@  struct conn {
     /* Immutable data. */
     struct conn_key_node key_node[CT_DIR_MAX];
     struct conn_key parent_key; /* Only used for orig_tuple support. */
-    struct ovs_list exp_node;
 
     uint16_t nat_action;
     char *alg;
@@ -121,7 +121,9 @@  struct conn {
     /* Mutable data. */
     bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
                         * control messages; true if reply direction. */
-    bool cleaned; /* True if cleaned from expiry lists. */
+    atomic_flag cleaned; /* True if the entry was stale and one of the
+                          * cleaner (i.e. packet path or sweeper) took
+                          * charge of it. */
 
     /* Immutable data. */
     bool alg_related; /* True if alg data connection. */
@@ -192,10 +194,25 @@  enum ct_timeout {
     N_CT_TM
 };
 
-struct conntrack {
-    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
+#define CONNTRACK_BUCKETS_SHIFT 10
+#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
+
+struct ct_bucket {
+    /* Protects 'conns'. In case of natted conns, there's a high
+     * chance that the forward and the reverse key stand in different
+     * buckets. buckets_lock() should be the preferred way to acquire
+     * these locks (unless otherwise needed), as it deals with the
+     * acquisition order. */
+    struct ovs_mutex lock;
+    /* Contains the connections in the bucket, indexed by
+     * 'struct conn_key'. */
     struct cmap conns OVS_GUARDED;
-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+};
+
+struct conntrack {
+    struct ct_bucket buckets[CONNTRACK_BUCKETS];
+    unsigned int next_bucket;
+    struct ovs_mutex ct_lock;
     struct cmap zone_limits OVS_GUARDED;
     struct cmap timeout_policies OVS_GUARDED;
     uint32_t hash_basis; /* Salt for hashing a connection key. */
@@ -220,9 +237,10 @@  struct conntrack {
 };
 
 /* Lock acquisition order:
- *    1. 'ct_lock'
- *    2. 'conn->lock'
- *    3. 'resources_lock'
+ *    1. 'buckets[p1]->lock'
+ *    2  'buckets[p2]->lock' (with p1 < p2)
+ *    3. 'conn->lock'
+ *    4. 'resources_lock'
  */
 
 extern struct ct_l4_proto ct_proto_tcp;
diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
index 9ecb06978..117810528 100644
--- a/lib/conntrack-tp.c
+++ b/lib/conntrack-tp.c
@@ -236,27 +236,6 @@  tm_to_ct_dpif_tp(enum ct_timeout tm)
     return CT_DPIF_TP_ATTR_MAX;
 }
 
-static void
-conn_update_expiration__(struct conntrack *ct, struct conn *conn,
-                         enum ct_timeout tm, long long now,
-                         uint32_t tp_value)
-    OVS_REQUIRES(conn->lock)
-{
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
-    if (!conn->cleaned) {
-        conn->expiration = now + tp_value * 1000;
-        ovs_list_remove(&conn->exp_node);
-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-    }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
-
-    ovs_mutex_lock(&conn->lock);
-}
-
 /* The conn entry lock must be held on entry and exit. */
 void
 conn_update_expiration(struct conntrack *ct, struct conn *conn,
@@ -266,42 +245,25 @@  conn_update_expiration(struct conntrack *ct, struct conn *conn,
     struct timeout_policy *tp;
     uint32_t val;
 
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
     tp = timeout_policy_lookup(ct, conn->tp_id);
     if (tp) {
         val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
     } else {
         val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
     }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
 
-    ovs_mutex_lock(&conn->lock);
     VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
                 "val=%u sec.",
                 ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
                 conn->tp_id, val);
 
-    conn_update_expiration__(ct, conn, tm, now, val);
-}
-
-static void
-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
-                       enum ct_timeout tm, long long now,
-                       uint32_t tp_value)
-{
-    conn->expiration = now + tp_value * 1000;
-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
+    conn->expiration = now + val * 1000;
 }
 
 /* ct_lock must be held. */
 void
 conn_init_expiration(struct conntrack *ct, struct conn *conn,
                      enum ct_timeout tm, long long now)
-    OVS_REQUIRES(ct->ct_lock)
 {
     struct timeout_policy *tp;
     uint32_t val;
@@ -317,5 +279,5 @@  conn_init_expiration(struct conntrack *ct, struct conn *conn,
                 ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
                 conn->tp_id, val);
 
-    conn_init_expiration__(ct, conn, tm, now, val);
+    conn->expiration = now + val * 1000;
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index a284c57c0..1c019af29 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -85,9 +85,12 @@  struct zone_limit {
     struct conntrack_zone_limit czl;
 };
 
+static unsigned hash_scale(uint32_t hash);
+static void conn_clean(struct conntrack *ct, struct conn *conn);
 static bool conn_key_extract(struct conntrack *, struct dp_packet *,
                              ovs_be16 dl_type, struct conn_lookup_ctx *,
                              uint16_t zone);
+static uint32_t cached_key_hash(struct conn_key_node *n);
 static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
 static void conn_key_reverse(struct conn_key *);
 static bool valid_new(struct dp_packet *pkt, struct conn_key *);
@@ -109,8 +112,9 @@  static void set_label(struct dp_packet *, struct conn *,
 static void *clean_thread_main(void *f_);
 
 static bool
-nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
-                     const struct nat_action_info_t *nat_info);
+nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
+                          const struct nat_action_info_t *nat_info,
+                          uint32_t *rev_hash);
 
 static uint8_t
 reverse_icmp_type(uint8_t type);
@@ -249,16 +253,17 @@  conntrack_init(void)
     ovs_rwlock_unlock(&ct->resources_lock);
 
     ovs_mutex_init_adaptive(&ct->ct_lock);
-    ovs_mutex_lock(&ct->ct_lock);
-    cmap_init(&ct->conns);
-    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
-        ovs_list_init(&ct->exp_lists[i]);
+
+    ct->next_bucket = 0;
+    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
+        struct ct_bucket *bucket = &ct->buckets[i];
+        cmap_init(&bucket->conns);
+        ovs_mutex_init_recursive(&bucket->lock);
     }
+
     cmap_init(&ct->zone_limits);
     ct->zone_limit_seq = 0;
     timeout_policy_init(ct);
-    ovs_mutex_unlock(&ct->ct_lock);
-
     atomic_count_init(&ct->n_conn, 0);
     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
     atomic_init(&ct->tcp_seq_chk, true);
@@ -410,9 +415,9 @@  zone_limit_delete(struct conntrack *ct, uint16_t zone)
 }
 
 static void
-conn_clean(struct conntrack *ct, struct conn *conn)
-    OVS_REQUIRES(ct->ct_lock)
+conn_clean__(struct conntrack *ct, struct conn *conn)
 {
+    struct ct_bucket *bucket;
     struct zone_limit *zl;
     uint32_t hash;
 
@@ -420,8 +425,9 @@  conn_clean(struct conntrack *ct, struct conn *conn)
         expectation_clean(ct, &conn->key_node[CT_DIR_FWD].key);
     }
 
-    hash = conn_key_hash(&conn->key_node[CT_DIR_FWD].key, ct->hash_basis);
-    cmap_remove(&ct->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
+    hash = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
+    bucket = &ct->buckets[hash_scale(hash)];
+    cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
 
     zl = zone_limit_lookup(ct, conn->admit_zone);
     if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
@@ -429,12 +435,10 @@  conn_clean(struct conntrack *ct, struct conn *conn)
     }
 
     if (conn->nat_action) {
-        hash = conn_key_hash(&conn->key_node[CT_DIR_REV].key,
-                             ct->hash_basis);
-        cmap_remove(&ct->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
+        hash = cached_key_hash(&conn->key_node[CT_DIR_REV]);
+        bucket = &ct->buckets[hash_scale(hash)];
+        cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
     }
-    ovs_list_remove(&conn->exp_node);
-    conn->cleaned = true;
     ovsrcu_postpone(delete_conn, conn);
     atomic_count_dec(&ct->n_conn);
 }
@@ -446,22 +450,35 @@  void
 conntrack_destroy(struct conntrack *ct)
 {
     struct conn_key_node *keyn;
+    struct ct_bucket *bucket;
     struct conn *conn;
+    int i;
 
     latch_set(&ct->clean_thread_exit);
     pthread_join(ct->clean_thread, NULL);
     latch_destroy(&ct->clean_thread_exit);
 
-    ovs_mutex_lock(&ct->ct_lock);
-    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
-        if (keyn->key.dir != CT_DIR_FWD) {
-            continue;
+    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+        bucket = &ct->buckets[i];
+        CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
+            if (keyn->key.dir != CT_DIR_FWD) {
+                continue;
+            }
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+            conn_clean(ct, conn);
         }
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        conn_clean(ct, conn);
     }
-    cmap_destroy(&ct->conns);
 
+    /* XXX: we need this loop because connections may be in multiple
+     * buckets.  The former loop should probably use conn_clean__()
+     * or an unlocked version of conn_clean(). */
+    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+        bucket = &ct->buckets[i];
+        ovs_mutex_destroy(&bucket->lock);
+        cmap_destroy(&ct->buckets[i].conns);
+    }
+
+    ovs_mutex_lock(&ct->ct_lock);
     struct zone_limit *zl;
     CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
         uint32_t hash = zone_key_hash(zl->czl.zone, ct->hash_basis);
@@ -498,45 +515,108 @@  conntrack_destroy(struct conntrack *ct)
 }
 
 
+static unsigned hash_scale(uint32_t hash)
+{
+    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
+}
+
 static bool
-conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
-                uint32_t hash, long long now, struct conn **conn_out,
+conn_key_lookup(struct conntrack *ct, unsigned bucket,
+                const struct conn_key *key, uint32_t hash,
+                long long now, struct conn **conn_out,
                 bool *reply)
 {
+    struct ct_bucket *ctb = &ct->buckets[bucket];
     struct conn_key_node *keyn;
-    struct conn *conn = NULL;
     bool found = false;
+    struct conn *conn;
 
-    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ct->conns) {
+    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ctb->conns) {
         conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+        if (conn_expired(conn, now)) {
+            conn_clean(ct, conn);
+            continue;
+        }
+
         for (int i = CT_DIR_FWD; i < CT_DIR_MAX; i++) {
-            if (!conn_key_cmp(&conn->key_node[i].key, key) &&
-                !conn_expired(conn, now)) {
+            if (!conn_key_cmp(&conn->key_node[i].key, key)) {
                 found = true;
                 if (reply) {
                     *reply = i;
                 }
-                goto out_found;
+
+                goto conn_out_found;
             }
         }
     }
 
-out_found:
-    if (found && conn_out) {
-        *conn_out = conn;
-    } else if (conn_out) {
-        *conn_out = NULL;
+conn_out_found:
+    if (conn_out) {
+        *conn_out = found ? conn : NULL;
     }
 
     return found;
 }
 
+static void
+buckets_unlock(struct conntrack *ct, uint32_t h1, uint32_t h2)
+{
+    unsigned p1 = hash_scale(h1),
+        p2 = hash_scale(h2);
+
+    if (p1 > p2) {
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+        ovs_mutex_unlock(&ct->buckets[p2].lock);
+    } else if (p1 < p2) {
+        ovs_mutex_unlock(&ct->buckets[p2].lock);
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+    } else {
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+    }
+}
+
+/* Acquires both locks in an ordered way. */
+static void
+buckets_lock(struct conntrack *ct, uint32_t h1, uint32_t h2)
+{
+    unsigned p1 = hash_scale(h1),
+        p2 = hash_scale(h2);
+
+    if (p1 < p2) {
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+        ovs_mutex_lock(&ct->buckets[p2].lock);
+    } else if (p1 > p2) {
+        ovs_mutex_lock(&ct->buckets[p2].lock);
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+    } else {
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+    }
+}
+
+static void
+conn_clean(struct conntrack *ct, struct conn *conn)
+{
+    uint32_t h1, h2;
+
+    if (atomic_flag_test_and_set(&conn->cleaned)) {
+        return;
+    }
+
+    h1 = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
+    h2 = cached_key_hash(&conn->key_node[CT_DIR_REV]);
+    buckets_lock(ct, h1, h2);
+    conn_clean__(ct, conn);
+    buckets_unlock(ct, h1, h2);
+}
+
 static bool
 conn_lookup(struct conntrack *ct, const struct conn_key *key,
             long long now, struct conn **conn_out, bool *reply)
 {
     uint32_t hash = conn_key_hash(key, ct->hash_basis);
-    return conn_key_lookup(ct, key, hash, now, conn_out, reply);
+    unsigned bucket = hash_scale(hash);
+
+    return conn_key_lookup(ct, bucket, key, hash, now, conn_out, reply);
 }
 
 static void
@@ -944,7 +1024,6 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                const struct nat_action_info_t *nat_action_info,
                const char *helper, const struct alg_exp_node *alg_exp,
                enum ct_alg_ctl_type ct_alg_ctl, uint32_t tp_id)
-    OVS_REQUIRES(ct->ct_lock)
 {
     struct conn *nc = NULL;
     uint32_t rev_hash = ctx->hash;
@@ -954,6 +1033,8 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         return nc;
     }
 
+    /* XXX: We are unlocked here, so we don't know
+     * if the tuple already exists in the table. */
     pkt->md.ct_state = CS_NEW;
 
     if (alg_exp) {
@@ -961,10 +1042,11 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
     }
 
     if (commit) {
-        struct conn_key_node *fwd_key_node, *rev_key_node;
-
         struct zone_limit *zl = zone_limit_lookup_or_default(ct,
                                                              ctx->key.zone);
+        struct conn_key_node *fwd_key_node, *rev_key_node;
+        bool handle_tuple = false;
+
         if (zl && atomic_count_get(&zl->czl.count) >= zl->czl.limit) {
             return nc;
         }
@@ -1007,22 +1089,40 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                     nc->nat_action = NAT_ACTION_DST;
                 }
             } else {
-                bool nat_res = nat_get_unique_tuple(ct, nc, nat_action_info);
-
-                if (!nat_res) {
-                    goto nat_res_exhaustion;
-                }
+                handle_tuple = true;
             }
-
-            nat_packet(pkt, nc, ctx->icmp_related);
-            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
-            rev_key_node->key.dir = CT_DIR_REV;
-            cmap_insert(&ct->conns, &rev_key_node->cm_node, rev_hash);
         }
 
         ovs_mutex_init_adaptive(&nc->lock);
+        atomic_flag_clear(&nc->cleaned);
         fwd_key_node->key.dir = CT_DIR_FWD;
-        cmap_insert(&ct->conns, &fwd_key_node->cm_node, ctx->hash);
+        rev_key_node->key.dir = CT_DIR_REV;
+
+        if (handle_tuple) {
+            bool nat_res = nat_get_unique_tuple_lock(ct, nc, nat_action_info,
+                                                     &rev_hash);
+
+            if (!nat_res) {
+                goto out_error;
+            }
+        } else {
+            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
+            buckets_lock(ct, ctx->hash, rev_hash);
+        }
+
+        if (conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
+            goto out_error_unlock;
+        }
+
+        fwd_key_node->key_hash = ctx->hash;
+        cmap_insert(&ct->buckets[hash_scale(ctx->hash)].conns,
+                    &fwd_key_node->cm_node, ctx->hash);
+        if (nat_action_info) {
+            rev_key_node->key_hash = rev_hash;
+            cmap_insert(&ct->buckets[hash_scale(rev_hash)].conns,
+                        &rev_key_node->cm_node, rev_hash);
+            nat_packet(pkt, nc, ctx->icmp_related);
+        }
 
         atomic_count_inc(&ct->n_conn);
         ctx->conn = nc; /* For completeness. */
@@ -1033,21 +1133,23 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         } else {
             nc->admit_zone = INVALID_ZONE;
         }
+        buckets_unlock(ct, ctx->hash, rev_hash);
     }
 
     return nc;
 
+out_error_unlock:
+    buckets_unlock(ct, ctx->hash, rev_hash);
     /* This would be a user error or a DOS attack.  A user error is prevented
      * by allocating enough combinations of NAT addresses when combined with
      * ephemeral ports.  A DOS attack should be protected against with
      * firewall rules or a separate firewall.  Also using zone partitioning
      * can limit DoS impact. */
-nat_res_exhaustion:
-    ovs_list_remove(&nc->exp_node);
+out_error:
+    ovs_mutex_destroy(&nc->lock);
     delete_conn_cmn(nc);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
-    VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
-                 "if DoS attack, use firewalling and/or zone partitioning.");
+    VLOG_WARN_RL(&rl, "Unable to insert a new connection.");
     return NULL;
 }
 
@@ -1082,12 +1184,7 @@  conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
             pkt->md.ct_state = CS_INVALID;
             break;
         case CT_UPDATE_NEW:
-            ovs_mutex_lock(&ct->ct_lock);
-            if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
-                            now, NULL, NULL)) {
-                conn_clean(ct, conn);
-            }
-            ovs_mutex_unlock(&ct->ct_lock);
+            conn_clean(ct, conn);
             create_new_conn = true;
             break;
         case CT_UPDATE_VALID_NEW:
@@ -1253,6 +1350,8 @@  static void
 initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
                     long long now, bool natted)
 {
+    unsigned bucket = hash_scale(ctx->hash);
+
     if (natted) {
         /* If the packet has been already natted (e.g. a previous
          * action took place), retrieve it performing a lookup of its
@@ -1260,7 +1359,8 @@  initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
         conn_key_reverse(&ctx->key);
     }
 
-    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
+    conn_key_lookup(ct, bucket, &ctx->key, ctx->hash,
+                    now, &ctx->conn, &ctx->reply);
 
     if (natted) {
         if (OVS_LIKELY(ctx->conn)) {
@@ -1287,24 +1387,20 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
             ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper,
             uint32_t tp_id)
 {
+    bool create_new_conn = false;
+
     /* Reset ct_state whenever entering a new zone. */
     if (pkt->md.ct_state && pkt->md.ct_zone != zone) {
         pkt->md.ct_state = 0;
     }
 
-    bool create_new_conn = false;
     initial_conn_lookup(ct, ctx, now, !!(pkt->md.ct_state &
                                          (CS_SRC_NAT | CS_DST_NAT)));
     struct conn *conn = ctx->conn;
 
     /* Delete found entry if in wrong direction. 'force' implies commit. */
     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
-        ovs_mutex_lock(&ct->ct_lock);
-        if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
-                        now, NULL, NULL)) {
-            conn_clean(ct, conn);
-        }
-        ovs_mutex_unlock(&ct->ct_lock);
+        conn_clean(ct, conn);
         conn = NULL;
     }
 
@@ -1338,7 +1434,6 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
     struct alg_exp_node alg_exp_entry;
 
     if (OVS_UNLIKELY(create_new_conn)) {
-
         ovs_rwlock_rdlock(&ct->resources_lock);
         alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
                                      ct->hash_basis,
@@ -1349,12 +1444,9 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
         }
         ovs_rwlock_unlock(&ct->resources_lock);
 
-        ovs_mutex_lock(&ct->ct_lock);
-        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
-            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
-                                  helper, alg_exp, ct_alg_ctl, tp_id);
-        }
-        ovs_mutex_unlock(&ct->ct_lock);
+        conn = conn_not_found(ct, pkt, ctx, commit, now,
+                              nat_action_info, helper, alg_exp,
+                              ct_alg_ctl, tp_id);
     }
 
     write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
@@ -1467,83 +1559,92 @@  set_label(struct dp_packet *pkt, struct conn *conn,
 }
 
 
-/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
- * earliest expiration time among the remaining connections in 'ctb'.  Returns
- * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
- * if 'limit' is reached */
+/* Delete the expired connections from 'bucket', up to 'limit'.
+ * Returns the earliest expiration time among the remaining
+ * connections in 'bucket'.  Returns LLONG_MAX if 'bucket' is empty.
+ * The return value might be smaller than 'now', if 'limit' is
+ * reached. */
 static long long
-ct_sweep(struct conntrack *ct, long long now, size_t limit)
+sweep_bucket(struct conntrack *ct, struct ct_bucket *bucket,
+             long long now)
 {
-    struct conn *conn, *next;
-    long long min_expiration = LLONG_MAX;
-    size_t count = 0;
+    struct conn_key_node *keyn;
+    unsigned int conn_count = 0;
+    struct conn *conn;
+    long long expiration;
 
-    ovs_mutex_lock(&ct->ct_lock);
+    CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
+        if (keyn->key.dir != CT_DIR_FWD) {
+            continue;
+        }
 
-    for (unsigned i = 0; i < N_CT_TM; i++) {
-        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
-            ovs_mutex_lock(&conn->lock);
-            if (now < conn->expiration || count >= limit) {
-                min_expiration = MIN(min_expiration, conn->expiration);
-                ovs_mutex_unlock(&conn->lock);
-                if (count >= limit) {
-                    /* Do not check other lists. */
-                    COVERAGE_INC(conntrack_long_cleanup);
-                    goto out;
-                }
-                break;
-            } else {
-                ovs_mutex_unlock(&conn->lock);
-                conn_clean(ct, conn);
-            }
-            count++;
+        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+        ovs_mutex_lock(&conn->lock);
+        expiration = conn->expiration;
+        ovs_mutex_unlock(&conn->lock);
+
+        if (now >= expiration) {
+            conn_clean(ct, conn);
         }
+
+        conn_count++;
     }
 
-out:
-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
-             time_msec() - now);
-    ovs_mutex_unlock(&ct->ct_lock);
-    return min_expiration;
+    return conn_count;
 }
 
-/* Cleans up old connection entries from 'ct'.  Returns the time when the
- * next expiration might happen.  The return value might be smaller than
- * 'now', meaning that an internal limit has been reached, and some expired
- * connections have not been deleted. */
+/* Cleans up old connection entries from 'ct'.  Returns the the next
+ * wake up time.  The return value might be smaller than 'now', meaning
+ * that an internal limit has been reached, that is, the table
+ * hasn't been entirely scanned. */
 static long long
 conntrack_clean(struct conntrack *ct, long long now)
 {
-    unsigned int n_conn_limit;
+    long long next_wakeup = now + 90 * 1000;
+    unsigned int n_conn_limit, i, count = 0;
+    size_t clean_end;
+
     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
-    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
-    long long min_exp = ct_sweep(ct, now, clean_max);
-    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
+    clean_end = n_conn_limit / 64;
+
+    for (i = ct->next_bucket; i < CONNTRACK_BUCKETS; i++) {
+        struct ct_bucket *bucket = &ct->buckets[i];
+
+        count += sweep_bucket(ct, bucket, now);
+
+        if (count > clean_end) {
+            next_wakeup = 0;
+            break;
+        }
+    }
+
+    ct->next_bucket = (i < CONNTRACK_BUCKETS) ? i : 0;
 
     return next_wakeup;
 }
 
 /* Cleanup:
  *
- * We must call conntrack_clean() periodically.  conntrack_clean() return
- * value gives an hint on when the next cleanup must be done (either because
- * there is an actual connection that expires, or because a new connection
- * might be created with the minimum timeout).
+ * We must call conntrack_clean() periodically.  conntrack_clean()
+ * return value gives an hint on when the next cleanup must be done
+ * (either because there is still work to do, or because a new
+ * connection might be created).
  *
  * The logic below has two goals:
  *
- * - We want to reduce the number of wakeups and batch connection cleanup
- *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
- *   are coping with the current cleanup tasks, then we wait at least
- *   5 seconds to do further cleanup.
+ * - When the load is high, we want to avoid to hog the CPU scanning
+ *   all the buckets and their respective CMAPs "at once". For this
+ *   reason, every batch cleanup aims to scan at most n_conn_limit /
+ *   64 entries (more if the buckets contains many entrie) before
+ *   yielding the CPU. In this case, the next wake up will happen in
+ *   CT_CLEAN_MIN_INTERVAL_MS and the scan will resume starting from
+ *   the first bucket not scanned.
  *
- * - We don't want to keep the map locked too long, as we might prevent
- *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
- *   behind, there is at least some 200ms blocks of time when the map will be
- *   left alone, so the datapath can operate unhindered.
- */
-#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
-#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
+ * - We also don't want to scan the buckets so frequently, as going
+ *   through all the connections, during high loads, may be costly in
+ *   terms of CPU time. In this case the next wake up is set to 90
+ *   seconds. */
+#define CT_CLEAN_MIN_INTERVAL_MS 100  /* 0.1 seconds */
 
 static void *
 clean_thread_main(void *f_)
@@ -1556,9 +1657,9 @@  clean_thread_main(void *f_)
         next_wake = conntrack_clean(ct, now);
 
         if (next_wake < now) {
-            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
         } else {
-            poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+            poll_timer_wait_until(next_wake);
         }
         latch_wait(&ct->clean_thread_exit);
         poll_block();
@@ -2088,6 +2189,12 @@  ct_endpoint_hash_add(uint32_t hash, const struct ct_endpoint *ep)
     return hash_add_bytes32(hash, (const uint32_t *) ep, sizeof *ep);
 }
 
+static uint32_t
+cached_key_hash(struct conn_key_node *n)
+{
+    return n->key_hash;
+}
+
 /* Symmetric */
 static uint32_t
 conn_key_hash(const struct conn_key *key, uint32_t basis)
@@ -2357,8 +2464,9 @@  next_addr_in_range_guarded(union ct_addr *curr, union ct_addr *min,
  *
  * If none can be found, return exhaustion to the caller. */
 static bool
-nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
-                     const struct nat_action_info_t *nat_info)
+nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
+                          const struct nat_action_info_t *nat_info,
+                          uint32_t *rev_hash)
 {
     union ct_addr min_addr = {0}, max_addr = {0}, curr_addr = {0},
                   guard_addr = {0};
@@ -2392,10 +2500,15 @@  another_round:
                       nat_info->nat_action);
 
     if (!pat_proto) {
+        uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
+        *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
+
+        buckets_lock(ct, key_hash, *rev_hash);
         if (!conn_lookup(ct, rev_key,
                          time_msec(), NULL, NULL)) {
             return true;
         }
+        buckets_unlock(ct, key_hash, *rev_hash);
 
         goto next_addr;
     }
@@ -2404,10 +2517,15 @@  another_round:
         rev_key->src.port = htons(curr_dport);
         FOR_EACH_PORT_IN_RANGE(curr_sport, min_sport, max_sport) {
             rev_key->dst.port = htons(curr_sport);
+            uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
+            *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
+
+            buckets_lock(ct, key_hash, *rev_hash);
             if (!conn_lookup(ct, rev_key,
                              time_msec(), NULL, NULL)) {
                 return true;
             }
+            buckets_unlock(ct, key_hash, *rev_hash);
         }
     }
 
@@ -2615,20 +2733,39 @@  conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
 {
     struct conntrack *ct = dump->ct;
     long long now = time_msec();
+    struct ct_bucket *bucket;
 
-    for (;;) {
-        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
-                                                       &dump->cm_pos);
-        if (!cm_node) {
-            break;
+    while (dump->bucket < CONNTRACK_BUCKETS) {
+        struct cmap_node *cm_node;
+        bucket = &ct->buckets[dump->bucket];
+
+        for (;;) {
+            cm_node = cmap_next_position(&bucket->conns,
+                                         &dump->cm_pos);
+            if (!cm_node) {
+                break;
+            }
+            struct conn_key_node *keyn;
+            struct conn *conn;
+            INIT_CONTAINER(keyn, cm_node, cm_node);
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+
+            if (conn_expired(conn, now)) {
+                /* XXX: ideally this should call conn_clean(). */
+                continue;
+            }
+
+            if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
+                (keyn->key.dir == CT_DIR_FWD)) {
+                conn_to_ct_dpif_entry(conn, entry, now);
+                break;
+            }
         }
-        struct conn_key_node *keyn;
-        struct conn *conn;
-        INIT_CONTAINER(keyn, cm_node, cm_node);
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
-            (keyn->key.dir == CT_DIR_FWD)) {
-            conn_to_ct_dpif_entry(conn, entry, now);
+
+        if (!cm_node) {
+            memset(&dump->cm_pos, 0, sizeof dump->cm_pos);
+            dump->bucket++;
+        } else {
             return 0;
         }
     }
@@ -2648,17 +2785,18 @@  conntrack_flush(struct conntrack *ct, const uint16_t *zone)
     struct conn_key_node *keyn;
     struct conn *conn;
 
-    ovs_mutex_lock(&ct->ct_lock);
-    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
-        if (keyn->key.dir != CT_DIR_FWD) {
-            continue;
-        }
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        if (!zone || *zone == keyn->key.zone) {
-            conn_clean(ct, conn);
+    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
+        CMAP_FOR_EACH (keyn, cm_node, &ct->buckets[i].conns) {
+            if (keyn->key.dir != CT_DIR_FWD) {
+                continue;
+            }
+
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+            if (!zone || *zone == keyn->key.zone) {
+                conn_clean(ct, conn);
+            }
         }
     }
-    ovs_mutex_unlock(&ct->ct_lock);
 
     return 0;
 }
@@ -2667,15 +2805,19 @@  int
 conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
                       uint16_t zone)
 {
-    int error = 0;
     struct conn_key key;
-    struct conn *conn;
+    struct conn *conn = NULL;
+    unsigned bucket;
+    uint32_t hash;
+    int error = 0;
 
     memset(&key, 0, sizeof(key));
     tuple_to_conn_key(tuple, zone, &key);
-    ovs_mutex_lock(&ct->ct_lock);
-    conn_lookup(ct, &key, time_msec(), &conn, NULL);
 
+    hash = conn_key_hash(&key, ct->hash_basis);
+    bucket = hash_scale(hash);
+
+    conn_key_lookup(ct, bucket, &key, hash, time_msec(), &conn, NULL);
     if (conn) {
         conn_clean(ct, conn);
     } else {
@@ -2683,7 +2825,6 @@  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
         error = ENOENT;
     }
 
-    ovs_mutex_unlock(&ct->ct_lock);
     return error;
 }
 
diff --git a/tests/system-traffic.at b/tests/system-traffic.at
index d79753a99..9a48a9a7f 100644
--- a/tests/system-traffic.at
+++ b/tests/system-traffic.at
@@ -4611,9 +4611,8 @@  AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.1.1.2) | sed -e 's/dst=
 tcp,orig=(src=10.1.1.1,dst=10.1.1.2,sport=<cleared>,dport=<cleared>),reply=(src=10.1.1.2,dst=10.1.1.2XX,sport=<cleared>,dport=<cleared>),zone=1,protoinfo=(state=<cleared>)
 ])
 
-OVS_TRAFFIC_VSWITCHD_STOP(["dnl
-/Unable to NAT due to tuple space exhaustion - if DoS attack, use firewalling and\/or zone partitioning./d
-/Dropped .* log messages in last .* seconds \(most recently, .* seconds ago\) due to excessive rate/d"])
+OVS_TRAFFIC_VSWITCHD_STOP(["/dnl
+Unable to insert a new connection./d"])
 AT_CLEANUP
 
 AT_SETUP([conntrack - more complex SNAT])