diff mbox series

[ovs-dev,v6,3/5] conntrack: Replace timeout based expiration lists with rculists.

Message ID 165669925241.2104762.12417592328400309216.stgit@fed.void
State Superseded
Headers show
Series conntrack: Improve multithread scalability. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/intel-ovs-compilation success test: success

Commit Message

Paolo Valerio July 1, 2022, 6:14 p.m. UTC
From: Gaetan Rivet <grive@u256.net>

This patch aims to replace the expiration lists as, due to the way
they are used, besides being a source of contention, they have a known
issue when used with non-default policies for different zones that
could lead to retaining expired connections potentially for a long
time.

This patch replaces them with an array of rculist used to distribute
all the newly created connections in order to, during the sweeping
phase, scan them without locking, and evict the expired connections
only locking during the actual removal.  This allows to reduce the
contention introduced by the pushback performed at every packet
update, also solving the issue related to zones and timeout policies.

Signed-off-by: Gaetan Rivet <grive@u256.net>
Co-authored-by: Paolo Valerio <pvalerio@redhat.com>
Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
---
v6:
- minor function renaming
- removed conn->lock in conn_clean() as this was unneeded.
- minor commit message rephrase
---
 lib/conntrack-private.h |   84 +++++++++++++++++---------
 lib/conntrack-tp.c      |   44 +-------------
 lib/conntrack.c         |  152 +++++++++++++++++++++++------------------------
 3 files changed, 133 insertions(+), 147 deletions(-)

Comments

wenxu July 3, 2022, 8:03 a.m. UTC | #1
Hi Paolo,


There are two small question.
First the ct_lock lock/unlock as below maybe also can be dropped with this patch ?


       ovs_mutex_lock(&ct->ct_lock);
        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
                                  helper, alg_exp, ct_alg_ctl, tp_id);
        }    
        ovs_mutex_unlock(&ct->ct_lock);






Second one  the zone_limit_lookup can be outof the conn_clean__ as below?


static void
conn_clean__(struct conntrack *ct, struct conn *conn)
    OVS_REQUIRES(ct->ct_lock)
{
    if (conn->alg) {
        expectation_clean(ct, &conn->key);
    }


    uint32_t hash = conn_key_hash(&conn->key, ct->hash_basis);
    cmap_remove(&ct->conns, &conn->cm_node, hash);


    struct zone_limit *zl = zone_limit_lookup(ct, conn->admit_zone);
    if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
        atomic_count_dec(&zl->czl.count);
    }
}






BR
wenxu











At 2022-07-02 02:14:12, "Paolo Valerio" <pvalerio@redhat.com> wrote:
>From: Gaetan Rivet <grive@u256.net>
>
>This patch aims to replace the expiration lists as, due to the way
>they are used, besides being a source of contention, they have a known
>issue when used with non-default policies for different zones that
>could lead to retaining expired connections potentially for a long
>time.
>
>This patch replaces them with an array of rculist used to distribute
>all the newly created connections in order to, during the sweeping
>phase, scan them without locking, and evict the expired connections
>only locking during the actual removal.  This allows to reduce the
>contention introduced by the pushback performed at every packet
>update, also solving the issue related to zones and timeout policies.
>
>Signed-off-by: Gaetan Rivet <grive@u256.net>
>Co-authored-by: Paolo Valerio <pvalerio@redhat.com>
>Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
>---
>v6:
>- minor function renaming
>- removed conn->lock in conn_clean() as this was unneeded.
>- minor commit message rephrase
>---
> lib/conntrack-private.h |   84 +++++++++++++++++---------
> lib/conntrack-tp.c      |   44 +-------------
> lib/conntrack.c         |  152 +++++++++++++++++++++++------------------------
> 3 files changed, 133 insertions(+), 147 deletions(-)
>
>diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
>index 34c688821..676f58d83 100644
>--- a/lib/conntrack-private.h
>+++ b/lib/conntrack-private.h
>@@ -29,6 +29,7 @@
> #include "openvswitch/list.h"
> #include "openvswitch/types.h"
> #include "packets.h"
>+#include "rculist.h"
> #include "unaligned.h"
> #include "dp-packet.h"
> 
>@@ -86,6 +87,31 @@ struct alg_exp_node {
>     bool nat_rpl_dst;
> };
> 
>+/* Timeouts: all the possible timeout states passed to update_expiration()
>+ * are listed here. The name will be prefix by CT_TM_ and the value is in
>+ * milliseconds */
>+#define CT_TIMEOUTS \
>+    CT_TIMEOUT(TCP_FIRST_PACKET) \
>+    CT_TIMEOUT(TCP_OPENING) \
>+    CT_TIMEOUT(TCP_ESTABLISHED) \
>+    CT_TIMEOUT(TCP_CLOSING) \
>+    CT_TIMEOUT(TCP_FIN_WAIT) \
>+    CT_TIMEOUT(TCP_CLOSED) \
>+    CT_TIMEOUT(OTHER_FIRST) \
>+    CT_TIMEOUT(OTHER_MULTIPLE) \
>+    CT_TIMEOUT(OTHER_BIDIR) \
>+    CT_TIMEOUT(ICMP_FIRST) \
>+    CT_TIMEOUT(ICMP_REPLY)
>+
>+enum ct_timeout {
>+#define CT_TIMEOUT(NAME) CT_TM_##NAME,
>+    CT_TIMEOUTS
>+#undef CT_TIMEOUT
>+    N_CT_TM
>+};
>+
>+#define EXP_LISTS 100
>+
> enum OVS_PACKED_ENUM ct_conn_type {
>     CT_CONN_TYPE_DEFAULT,
>     CT_CONN_TYPE_UN_NAT,
>@@ -96,11 +122,16 @@ struct conn {
>     struct conn_key key;
>     struct conn_key rev_key;
>     struct conn_key parent_key; /* Only used for orig_tuple support. */
>-    struct ovs_list exp_node;
>     struct cmap_node cm_node;
>     uint16_t nat_action;
>     char *alg;
>     struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
>+    atomic_flag reclaimed; /* False during the lifetime of the connection,
>+                            * True as soon as a thread has started freeing
>+                            * its memory. */
>+
>+    /* Inserted once by a PMD, then managed by the 'ct_clean' thread. */
>+    struct rculist node;
> 
>     /* Mutable data. */
>     struct ovs_mutex lock; /* Guards all mutable fields. */
>@@ -116,7 +147,6 @@ struct conn {
>     /* Mutable data. */
>     bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
>                         * control messages; true if reply direction. */
>-    bool cleaned; /* True if cleaned from expiry lists. */
> 
>     /* Immutable data. */
>     bool alg_related; /* True if alg data connection. */
>@@ -132,22 +162,6 @@ enum ct_update_res {
>     CT_UPDATE_VALID_NEW,
> };
> 
>-/* Timeouts: all the possible timeout states passed to update_expiration()
>- * are listed here. The name will be prefix by CT_TM_ and the value is in
>- * milliseconds */
>-#define CT_TIMEOUTS \
>-    CT_TIMEOUT(TCP_FIRST_PACKET) \
>-    CT_TIMEOUT(TCP_OPENING) \
>-    CT_TIMEOUT(TCP_ESTABLISHED) \
>-    CT_TIMEOUT(TCP_CLOSING) \
>-    CT_TIMEOUT(TCP_FIN_WAIT) \
>-    CT_TIMEOUT(TCP_CLOSED) \
>-    CT_TIMEOUT(OTHER_FIRST) \
>-    CT_TIMEOUT(OTHER_MULTIPLE) \
>-    CT_TIMEOUT(OTHER_BIDIR) \
>-    CT_TIMEOUT(ICMP_FIRST) \
>-    CT_TIMEOUT(ICMP_REPLY)
>-
> #define NAT_ACTION_SNAT_ALL (NAT_ACTION_SRC | NAT_ACTION_SRC_PORT)
> #define NAT_ACTION_DNAT_ALL (NAT_ACTION_DST | NAT_ACTION_DST_PORT)
> 
>@@ -181,22 +195,17 @@ enum ct_ephemeral_range {
> #define FOR_EACH_PORT_IN_RANGE(curr, min, max) \
>     FOR_EACH_PORT_IN_RANGE__(curr, min, max, OVS_JOIN(idx, __COUNTER__))
> 
>-enum ct_timeout {
>-#define CT_TIMEOUT(NAME) CT_TM_##NAME,
>-    CT_TIMEOUTS
>-#undef CT_TIMEOUT
>-    N_CT_TM
>-};
>-
> struct conntrack {
>     struct ovs_mutex ct_lock; /* Protects 2 following fields. */
>     struct cmap conns OVS_GUARDED;
>-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
>+    struct rculist exp_lists[EXP_LISTS];
>     struct cmap zone_limits OVS_GUARDED;
>     struct cmap timeout_policies OVS_GUARDED;
>     uint32_t hash_basis; /* Salt for hashing a connection key. */
>     pthread_t clean_thread; /* Periodically cleans up connection tracker. */
>     struct latch clean_thread_exit; /* To destroy the 'clean_thread'. */
>+    atomic_uint ct_next_list;
>+    unsigned int next_sweep;
> 
>     /* Counting connections. */
>     atomic_count n_conn; /* Number of connections currently tracked. */
>@@ -216,8 +225,8 @@ struct conntrack {
> };
> 
> /* Lock acquisition order:
>- *    1. 'ct_lock'
>- *    2. 'conn->lock'
>+ *    1. 'conn->lock'
>+ *    2. 'ct_lock'
>  *    3. 'resources_lock'
>  */
> 
>@@ -237,4 +246,23 @@ struct ct_l4_proto {
>                                struct ct_dpif_protoinfo *);
> };
> 
>+static unsigned int
>+ct_next_list(struct conntrack *ct)
>+{
>+    unsigned int old;
>+
>+    atomic_add_relaxed(&ct->ct_next_list, 1u, &old);
>+
>+    return old % EXP_LISTS;
>+}
>+
>+static inline void
>+conn_expire_push_front(struct conntrack *ct, struct conn *conn)
>+    OVS_REQUIRES(ct->ct_lock)
>+{
>+    unsigned int next = ct_next_list(ct);
>+
>+    rculist_push_front(&ct->exp_lists[next], &conn->node);
>+}
>+
> #endif /* conntrack-private.h */
>diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
>index c2245038b..7b8f9007b 100644
>--- a/lib/conntrack-tp.c
>+++ b/lib/conntrack-tp.c
>@@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
>     return CT_DPIF_TP_ATTR_MAX;
> }
> 
>-static void
>-conn_update_expiration__(struct conntrack *ct, struct conn *conn,
>-                         enum ct_timeout tm, long long now,
>-                         uint32_t tp_value)
>-    OVS_REQUIRES(conn->lock)
>-{
>-    ovs_mutex_unlock(&conn->lock);
>-
>-    ovs_mutex_lock(&ct->ct_lock);
>-    ovs_mutex_lock(&conn->lock);
>-    if (!conn->cleaned) {
>-        conn->expiration = now + tp_value * 1000;
>-        ovs_list_remove(&conn->exp_node);
>-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>-    }
>-    ovs_mutex_unlock(&conn->lock);
>-    ovs_mutex_unlock(&ct->ct_lock);
>-
>-    ovs_mutex_lock(&conn->lock);
>-}
>-
> /* The conn entry lock must be held on entry and exit. */
> void
> conn_update_expiration(struct conntrack *ct, struct conn *conn,
>@@ -266,41 +245,22 @@ conn_update_expiration(struct conntrack *ct, struct conn *conn,
>     struct timeout_policy *tp;
>     uint32_t val;
> 
>-    ovs_mutex_unlock(&conn->lock);
>-
>-    ovs_mutex_lock(&ct->ct_lock);
>-    ovs_mutex_lock(&conn->lock);
>     tp = timeout_policy_lookup(ct, conn->tp_id);
>     if (tp) {
>         val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
>     } else {
>         val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
>     }
>-    ovs_mutex_unlock(&conn->lock);
>-    ovs_mutex_unlock(&ct->ct_lock);
>-
>-    ovs_mutex_lock(&conn->lock);
>     VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
>                 "val=%u sec.",
>                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
> 
>-    conn_update_expiration__(ct, conn, tm, now, val);
>+    conn->expiration = now + val * 1000;
> }
> 
>-static void
>-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
>-                       enum ct_timeout tm, long long now,
>-                       uint32_t tp_value)
>-{
>-    conn->expiration = now + tp_value * 1000;
>-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>-}
>-
>-/* ct_lock must be held. */
> void
> conn_init_expiration(struct conntrack *ct, struct conn *conn,
>                      enum ct_timeout tm, long long now)
>-    OVS_REQUIRES(ct->ct_lock)
> {
>     struct timeout_policy *tp;
>     uint32_t val;
>@@ -315,5 +275,5 @@ conn_init_expiration(struct conntrack *ct, struct conn *conn,
>     VLOG_DBG_RL(&rl, "Init timeout %s zone=%u with policy id=%d val=%u sec.",
>                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
> 
>-    conn_init_expiration__(ct, conn, tm, now, val);
>+    conn->expiration = now + val * 1000;
> }
>diff --git a/lib/conntrack.c b/lib/conntrack.c
>index 38ddc9b91..819c356c1 100644
>--- a/lib/conntrack.c
>+++ b/lib/conntrack.c
>@@ -39,12 +39,12 @@
> #include "ovs-thread.h"
> #include "openvswitch/poll-loop.h"
> #include "random.h"
>+#include "rculist.h"
> #include "timeval.h"
> 
> VLOG_DEFINE_THIS_MODULE(conntrack);
> 
> COVERAGE_DEFINE(conntrack_full);
>-COVERAGE_DEFINE(conntrack_long_cleanup);
> COVERAGE_DEFINE(conntrack_l3csum_err);
> COVERAGE_DEFINE(conntrack_l4csum_err);
> COVERAGE_DEFINE(conntrack_lookup_natted_miss);
>@@ -94,9 +94,8 @@ static bool valid_new(struct dp_packet *pkt, struct conn_key *);
> static struct conn *new_conn(struct conntrack *ct, struct dp_packet *pkt,
>                              struct conn_key *, long long now,
>                              uint32_t tp_id);
>-static void delete_conn_cmn(struct conn *);
>+static void delete_conn__(struct conn *);
> static void delete_conn(struct conn *);
>-static void delete_conn_one(struct conn *conn);
> static enum ct_update_res conn_update(struct conntrack *ct, struct conn *conn,
>                                       struct dp_packet *pkt,
>                                       struct conn_lookup_ctx *ctx,
>@@ -309,7 +308,7 @@ conntrack_init(void)
>     ovs_mutex_lock(&ct->ct_lock);
>     cmap_init(&ct->conns);
>     for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
>-        ovs_list_init(&ct->exp_lists[i]);
>+        rculist_init(&ct->exp_lists[i]);
>     }
>     cmap_init(&ct->zone_limits);
>     ct->zone_limit_seq = 0;
>@@ -319,6 +318,7 @@ conntrack_init(void)
>     atomic_count_init(&ct->n_conn, 0);
>     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>     atomic_init(&ct->tcp_seq_chk, true);
>+    atomic_init(&ct->ct_next_list, 0);
>     latch_init(&ct->clean_thread_exit);
>     ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
>     ct->ipf = ipf_init();
>@@ -467,7 +467,7 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
> }
> 
> static void
>-conn_clean_cmn(struct conntrack *ct, struct conn *conn)
>+conn_clean__(struct conntrack *ct, struct conn *conn)
>     OVS_REQUIRES(ct->ct_lock)
> {
>     if (conn->alg) {
>@@ -487,32 +487,34 @@ conn_clean_cmn(struct conntrack *ct, struct conn *conn)
>  * removes the associated nat 'conn' from the lookup datastructures. */
> static void
> conn_clean(struct conntrack *ct, struct conn *conn)
>-    OVS_REQUIRES(ct->ct_lock)
>+    OVS_EXCLUDED(conn->lock, ct->ct_lock)
> {
>     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
> 
>-    conn_clean_cmn(ct, conn);
>+    if (atomic_flag_test_and_set(&conn->reclaimed)) {
>+        return;
>+    }
>+
>+    ovs_mutex_lock(&ct->ct_lock);
>+    conn_clean__(ct, conn);
>     if (conn->nat_conn) {
>         uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
>         cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
>     }
>-    ovs_list_remove(&conn->exp_node);
>-    conn->cleaned = true;
>+
>+    rculist_remove(&conn->node);
>+    ovs_mutex_unlock(&ct->ct_lock);
>+
>     ovsrcu_postpone(delete_conn, conn);
>     atomic_count_dec(&ct->n_conn);
> }
> 
> static void
>-conn_clean_one(struct conntrack *ct, struct conn *conn)
>-    OVS_REQUIRES(ct->ct_lock)
>+conn_force_expire(struct conn *conn)
> {
>-    conn_clean_cmn(ct, conn);
>-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>-        ovs_list_remove(&conn->exp_node);
>-        conn->cleaned = true;
>-        atomic_count_dec(&ct->n_conn);
>-    }
>-    ovsrcu_postpone(delete_conn_one, conn);
>+    ovs_mutex_lock(&conn->lock);
>+    conn->expiration = 0;
>+    ovs_mutex_unlock(&conn->lock);
> }
> 
> /* Destroys the connection tracker 'ct' and frees all the allocated memory.
>@@ -522,15 +524,16 @@ void
> conntrack_destroy(struct conntrack *ct)
> {
>     struct conn *conn;
>+
>     latch_set(&ct->clean_thread_exit);
>     pthread_join(ct->clean_thread, NULL);
>     latch_destroy(&ct->clean_thread_exit);
> 
>-    ovs_mutex_lock(&ct->ct_lock);
>-    CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
>-        conn_clean_one(ct, conn);
>+    for (unsigned i = 0; i < EXP_LISTS; i++) {
>+        RCULIST_FOR_EACH (conn, node, &ct->exp_lists[i]) {
>+            conn_clean(ct, conn);
>+        }
>     }
>-    cmap_destroy(&ct->conns);
> 
>     struct zone_limit *zl;
>     CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
>@@ -539,7 +542,6 @@ conntrack_destroy(struct conntrack *ct)
>         cmap_remove(&ct->zone_limits, &zl->node, hash);
>         ovsrcu_postpone(free, zl);
>     }
>-    cmap_destroy(&ct->zone_limits);
> 
>     struct timeout_policy *tp;
>     CMAP_FOR_EACH (tp, node, &ct->timeout_policies) {
>@@ -548,6 +550,11 @@ conntrack_destroy(struct conntrack *ct)
>         cmap_remove(&ct->timeout_policies, &tp->node, hash);
>         ovsrcu_postpone(free, tp);
>     }
>+
>+    ovs_mutex_lock(&ct->ct_lock);
>+
>+    cmap_destroy(&ct->conns);
>+    cmap_destroy(&ct->zone_limits);
>     cmap_destroy(&ct->timeout_policies);
> 
>     ovs_mutex_unlock(&ct->ct_lock);
>@@ -1087,7 +1094,9 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>         nc->nat_conn = nat_conn;
>         ovs_mutex_init_adaptive(&nc->lock);
>         nc->conn_type = CT_CONN_TYPE_DEFAULT;
>+        atomic_flag_clear(&nc->reclaimed);
>         cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
>+        conn_expire_push_front(ct, nc);
>         atomic_count_inc(&ct->n_conn);
>         ctx->conn = nc; /* For completeness. */
>         if (zl) {
>@@ -1108,8 +1117,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>      * can limit DoS impact. */
> nat_res_exhaustion:
>     free(nat_conn);
>-    ovs_list_remove(&nc->exp_node);
>-    delete_conn_cmn(nc);
>+    delete_conn__(nc);
>     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>     VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
>                  "if DoS attack, use firewalling and/or zone partitioning.");
>@@ -1148,11 +1156,9 @@ conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
>             pkt->md.ct_state = CS_INVALID;
>             break;
>         case CT_UPDATE_NEW:
>-            ovs_mutex_lock(&ct->ct_lock);
>             if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
>-                conn_clean(ct, conn);
>+                conn_force_expire(conn);
>             }
>-            ovs_mutex_unlock(&ct->ct_lock);
>             create_new_conn = true;
>             break;
>         case CT_UPDATE_VALID_NEW:
>@@ -1363,11 +1369,9 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
> 
>     /* Delete found entry if in wrong direction. 'force' implies commit. */
>     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
>-        ovs_mutex_lock(&ct->ct_lock);
>         if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
>-            conn_clean(ct, conn);
>+            conn_force_expire(conn);
>         }
>-        ovs_mutex_unlock(&ct->ct_lock);
>         conn = NULL;
>     }
> 
>@@ -1553,39 +1557,21 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>  * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
>  * if 'limit' is reached */
> static long long
>-ct_sweep(struct conntrack *ct, long long now, size_t limit)
>+ct_sweep(struct conntrack *ct, struct rculist *list, long long now)
>+    OVS_NO_THREAD_SAFETY_ANALYSIS
> {
>     struct conn *conn;
>-    long long min_expiration = LLONG_MAX;
>     size_t count = 0;
> 
>-    ovs_mutex_lock(&ct->ct_lock);
>-
>-    for (unsigned i = 0; i < N_CT_TM; i++) {
>-        LIST_FOR_EACH_SAFE (conn, exp_node, &ct->exp_lists[i]) {
>-            ovs_mutex_lock(&conn->lock);
>-            if (now < conn->expiration || count >= limit) {
>-                min_expiration = MIN(min_expiration, conn->expiration);
>-                ovs_mutex_unlock(&conn->lock);
>-                if (count >= limit) {
>-                    /* Do not check other lists. */
>-                    COVERAGE_INC(conntrack_long_cleanup);
>-                    goto out;
>-                }
>-                break;
>-            } else {
>-                ovs_mutex_unlock(&conn->lock);
>-                conn_clean(ct, conn);
>-            }
>-            count++;
>+    RCULIST_FOR_EACH (conn, node, list) {
>+        if (conn_expired(conn, now)) {
>+            conn_clean(ct, conn);
>         }
>+
>+        count++;
>     }
> 
>-out:
>-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
>-             time_msec() - now);
>-    ovs_mutex_unlock(&ct->ct_lock);
>-    return min_expiration;
>+    return count;
> }
> 
> /* Cleans up old connection entries from 'ct'.  Returns the time when the
>@@ -1595,11 +1581,26 @@ out:
> static long long
> conntrack_clean(struct conntrack *ct, long long now)
> {
>-    unsigned int n_conn_limit;
>+    long long next_wakeup = now + 30 * 1000;
>+    unsigned int n_conn_limit, i, count = 0;
>+    size_t clean_end;
>+
>     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
>-    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
>-    long long min_exp = ct_sweep(ct, now, clean_max);
>-    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
>+    clean_end = n_conn_limit / 64;
>+
>+    for (i = ct->next_sweep; i < EXP_LISTS; i++) {
>+        count += ct_sweep(ct, &ct->exp_lists[i], now);
>+
>+        if (count > clean_end) {
>+            next_wakeup = 0;
>+            break;
>+        }
>+    }
>+
>+    ct->next_sweep = (i < EXP_LISTS) ? i : 0;
>+
>+    VLOG_DBG("conntrack cleanup %"PRIu32" entries in %lld msec", count,
>+             time_msec() - now);
> 
>     return next_wakeup;
> }
>@@ -1628,6 +1629,7 @@ conntrack_clean(struct conntrack *ct, long long now)
> 
> static void *
> clean_thread_main(void *f_)
>+    OVS_NO_THREAD_SAFETY_ANALYSIS
> {
>     struct conntrack *ct = f_;
> 
>@@ -2539,7 +2541,7 @@ new_conn(struct conntrack *ct, struct dp_packet *pkt, struct conn_key *key,
> }
> 
> static void
>-delete_conn_cmn(struct conn *conn)
>+delete_conn__(struct conn *conn)
> {
>     free(conn->alg);
>     free(conn);
>@@ -2551,18 +2553,9 @@ delete_conn(struct conn *conn)
>     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
>     ovs_mutex_destroy(&conn->lock);
>     free(conn->nat_conn);
>-    delete_conn_cmn(conn);
>+    delete_conn__(conn);
> }
> 
>-/* Only used by conn_clean_one(). */
>-static void
>-delete_conn_one(struct conn *conn)
>-{
>-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>-        ovs_mutex_destroy(&conn->lock);
>-    }
>-    delete_conn_cmn(conn);
>-}
> 
> /* Convert a conntrack address 'a' into an IP address 'b' based on 'dl_type'.
>  *
>@@ -2714,6 +2707,11 @@ conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
>         }
>         struct conn *conn;
>         INIT_CONTAINER(conn, cm_node, cm_node);
>+
>+        if (conn_expired(conn, now)) {
>+            continue;
>+        }
>+
>         if ((!dump->filter_zone || conn->key.zone == dump->zone) &&
>             (conn->conn_type != CT_CONN_TYPE_UN_NAT)) {
>             conn_to_ct_dpif_entry(conn, entry, now);
>@@ -2735,13 +2733,15 @@ conntrack_flush(struct conntrack *ct, const uint16_t *zone)
> {
>     struct conn *conn;
> 
>-    ovs_mutex_lock(&ct->ct_lock);
>     CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
>+        if (conn->conn_type != CT_CONN_TYPE_DEFAULT) {
>+            continue;
>+        }
>+
>         if (!zone || *zone == conn->key.zone) {
>-            conn_clean_one(ct, conn);
>+            conn_clean(ct, conn);
>         }
>     }
>-    ovs_mutex_unlock(&ct->ct_lock);
> 
>     return 0;
> }
>@@ -2756,7 +2756,6 @@ conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
> 
>     memset(&key, 0, sizeof(key));
>     tuple_to_conn_key(tuple, zone, &key);
>-    ovs_mutex_lock(&ct->ct_lock);
>     conn_lookup(ct, &key, time_msec(), &conn, NULL);
> 
>     if (conn && conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>@@ -2766,7 +2765,6 @@ conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
>         error = ENOENT;
>     }
> 
>-    ovs_mutex_unlock(&ct->ct_lock);
>     return error;
> }
> 
>
>_______________________________________________
>dev mailing list
>dev@openvswitch.org
>https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Paolo Valerio July 4, 2022, 8:43 a.m. UTC | #2
Hello wenxu,

thanks for having a look at it.

wenxu  <wenx05124561@163.com> writes:

> Hi Paolo,
>
> There are two small question.
> First the ct_lock lock/unlock as below maybe also can be dropped with this
> patch ?
>
>        ovs_mutex_lock(&ct->ct_lock);
>         if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
>             conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
>                                   helper, alg_exp, ct_alg_ctl, tp_id);
>         }    
>         ovs_mutex_unlock(&ct->ct_lock);
>

The locked lookup/insertion should be kept, as it could lead e.g. to a
double insertion in the case we lookup without locking.
But you're right, in general, there should be room for improvement
because we could probably narrow the region we lock.
IMO, we should keep this out of this series, and maybe follow up
later, to avoid introducing too many changes at once.

>
>
> Second one  the zone_limit_lookup can be outof the conn_clean__ as below?
>
> static void
> conn_clean__(struct conntrack *ct, struct conn *conn)
>     OVS_REQUIRES(ct->ct_lock)
> {
>     if (conn->alg) {
>         expectation_clean(ct, &conn->key);
>     }
>
>     uint32_t hash = conn_key_hash(&conn->key, ct->hash_basis);
>     cmap_remove(&ct->conns, &conn->cm_node, hash);
>
>     struct zone_limit *zl = zone_limit_lookup(ct, conn->admit_zone);
>     if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
>         atomic_count_dec(&zl->czl.count);
>     }
> }
>

do you mean it doesn't need to be under the ct_lock?
Although not a problem, it seems that keeping it under the ct_lock could
be avoided.
I guess we could easily follow up on this. Not sure whether it's worth a
respin now (for the code freeze) or, if this makes it, afterward.
I'd say, let's give Ilya and others the chance to have a look at this
and let's hear from them.

>
>
> BR
> wenxu
>
>
>
>
>
> At 2022-07-02 02:14:12, "Paolo Valerio" <pvalerio@redhat.com> wrote:
>>From: Gaetan Rivet <grive@u256.net>
>>
>>This patch aims to replace the expiration lists as, due to the way
>>they are used, besides being a source of contention, they have a known
>>issue when used with non-default policies for different zones that
>>could lead to retaining expired connections potentially for a long
>>time.
>>
>>This patch replaces them with an array of rculist used to distribute
>>all the newly created connections in order to, during the sweeping
>>phase, scan them without locking, and evict the expired connections
>>only locking during the actual removal.  This allows to reduce the
>>contention introduced by the pushback performed at every packet
>>update, also solving the issue related to zones and timeout policies.
>>
>>Signed-off-by: Gaetan Rivet <grive@u256.net>
>>Co-authored-by: Paolo Valerio <pvalerio@redhat.com>
>>Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
>>---
>>v6:
>>- minor function renaming
>>- removed conn->lock in conn_clean() as this was unneeded.
>>- minor commit message rephrase
>>---
>> lib/conntrack-private.h |   84 +++++++++++++++++---------
>> lib/conntrack-tp.c      |   44 +-------------
>> lib/conntrack.c         |  152 +++++++++++++++++++++++------------------------
>> 3 files changed, 133 insertions(+), 147 deletions(-)
>>
>>diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
>>index 34c688821..676f58d83 100644
>>--- a/lib/conntrack-private.h
>>+++ b/lib/conntrack-private.h
>>@@ -29,6 +29,7 @@
>> #include "openvswitch/list.h"
>> #include "openvswitch/types.h"
>> #include "packets.h"
>>+#include "rculist.h"
>> #include "unaligned.h"
>> #include "dp-packet.h"
>>
>>@@ -86,6 +87,31 @@ struct alg_exp_node {
>>     bool nat_rpl_dst;
>> };
>>
>>+/* Timeouts: all the possible timeout states passed to update_expiration()
>>+ * are listed here. The name will be prefix by CT_TM_ and the value is in
>>+ * milliseconds */
>>+#define CT_TIMEOUTS \
>>+    CT_TIMEOUT(TCP_FIRST_PACKET) \
>>+    CT_TIMEOUT(TCP_OPENING) \
>>+    CT_TIMEOUT(TCP_ESTABLISHED) \
>>+    CT_TIMEOUT(TCP_CLOSING) \
>>+    CT_TIMEOUT(TCP_FIN_WAIT) \
>>+    CT_TIMEOUT(TCP_CLOSED) \
>>+    CT_TIMEOUT(OTHER_FIRST) \
>>+    CT_TIMEOUT(OTHER_MULTIPLE) \
>>+    CT_TIMEOUT(OTHER_BIDIR) \
>>+    CT_TIMEOUT(ICMP_FIRST) \
>>+    CT_TIMEOUT(ICMP_REPLY)
>>+
>>+enum ct_timeout {
>>+#define CT_TIMEOUT(NAME) CT_TM_##NAME,
>>+    CT_TIMEOUTS
>>+#undef CT_TIMEOUT
>>+    N_CT_TM
>>+};
>>+
>>+#define EXP_LISTS 100
>>+
>> enum OVS_PACKED_ENUM ct_conn_type {
>>     CT_CONN_TYPE_DEFAULT,
>>     CT_CONN_TYPE_UN_NAT,
>>@@ -96,11 +122,16 @@ struct conn {
>>     struct conn_key key;
>>     struct conn_key rev_key;
>>     struct conn_key parent_key; /* Only used for orig_tuple support. */
>>-    struct ovs_list exp_node;
>>     struct cmap_node cm_node;
>>     uint16_t nat_action;
>>     char *alg;
>>     struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
>>+    atomic_flag reclaimed; /* False during the lifetime of the connection,
>>+                            * True as soon as a thread has started freeing
>>+                            * its memory. */
>>+
>>+    /* Inserted once by a PMD, then managed by the 'ct_clean' thread. */
>>+    struct rculist node;
>>
>>     /* Mutable data. */
>>     struct ovs_mutex lock; /* Guards all mutable fields. */
>>@@ -116,7 +147,6 @@ struct conn {
>>     /* Mutable data. */
>>     bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
>>                         * control messages; true if reply direction. */
>>-    bool cleaned; /* True if cleaned from expiry lists. */
>>
>>     /* Immutable data. */
>>     bool alg_related; /* True if alg data connection. */
>>@@ -132,22 +162,6 @@ enum ct_update_res {
>>     CT_UPDATE_VALID_NEW,
>> };
>>
>>-/* Timeouts: all the possible timeout states passed to update_expiration()
>>- * are listed here. The name will be prefix by CT_TM_ and the value is in
>>- * milliseconds */
>>-#define CT_TIMEOUTS \
>>-    CT_TIMEOUT(TCP_FIRST_PACKET) \
>>-    CT_TIMEOUT(TCP_OPENING) \
>>-    CT_TIMEOUT(TCP_ESTABLISHED) \
>>-    CT_TIMEOUT(TCP_CLOSING) \
>>-    CT_TIMEOUT(TCP_FIN_WAIT) \
>>-    CT_TIMEOUT(TCP_CLOSED) \
>>-    CT_TIMEOUT(OTHER_FIRST) \
>>-    CT_TIMEOUT(OTHER_MULTIPLE) \
>>-    CT_TIMEOUT(OTHER_BIDIR) \
>>-    CT_TIMEOUT(ICMP_FIRST) \
>>-    CT_TIMEOUT(ICMP_REPLY)
>>-
>> #define NAT_ACTION_SNAT_ALL (NAT_ACTION_SRC | NAT_ACTION_SRC_PORT)
>> #define NAT_ACTION_DNAT_ALL (NAT_ACTION_DST | NAT_ACTION_DST_PORT)
>>
>>@@ -181,22 +195,17 @@ enum ct_ephemeral_range {
>> #define FOR_EACH_PORT_IN_RANGE(curr, min, max) \
>>     FOR_EACH_PORT_IN_RANGE__(curr, min, max, OVS_JOIN(idx, __COUNTER__))
>>
>>-enum ct_timeout {
>>-#define CT_TIMEOUT(NAME) CT_TM_##NAME,
>>-    CT_TIMEOUTS
>>-#undef CT_TIMEOUT
>>-    N_CT_TM
>>-};
>>-
>> struct conntrack {
>>     struct ovs_mutex ct_lock; /* Protects 2 following fields. */
>>     struct cmap conns OVS_GUARDED;
>>-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
>>+    struct rculist exp_lists[EXP_LISTS];
>>     struct cmap zone_limits OVS_GUARDED;
>>     struct cmap timeout_policies OVS_GUARDED;
>>     uint32_t hash_basis; /* Salt for hashing a connection key. */
>>     pthread_t clean_thread; /* Periodically cleans up connection tracker. */
>>     struct latch clean_thread_exit; /* To destroy the 'clean_thread'. */
>>+    atomic_uint ct_next_list;
>>+    unsigned int next_sweep;
>>
>>     /* Counting connections. */
>>     atomic_count n_conn; /* Number of connections currently tracked. */
>>@@ -216,8 +225,8 @@ struct conntrack {
>> };
>>
>> /* Lock acquisition order:
>>- *    1. 'ct_lock'
>>- *    2. 'conn->lock'
>>+ *    1. 'conn->lock'
>>+ *    2. 'ct_lock'
>>  *    3. 'resources_lock'
>>  */
>>
>>@@ -237,4 +246,23 @@ struct ct_l4_proto {
>>                                struct ct_dpif_protoinfo *);
>> };
>>
>>+static unsigned int
>>+ct_next_list(struct conntrack *ct)
>>+{
>>+    unsigned int old;
>>+
>>+    atomic_add_relaxed(&ct->ct_next_list, 1u, &old);
>>+
>>+    return old % EXP_LISTS;
>>+}
>>+
>>+static inline void
>>+conn_expire_push_front(struct conntrack *ct, struct conn *conn)
>>+    OVS_REQUIRES(ct->ct_lock)
>>+{
>>+    unsigned int next = ct_next_list(ct);
>>+
>>+    rculist_push_front(&ct->exp_lists[next], &conn->node);
>>+}
>>+
>> #endif /* conntrack-private.h */
>>diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
>>index c2245038b..7b8f9007b 100644
>>--- a/lib/conntrack-tp.c
>>+++ b/lib/conntrack-tp.c
>>@@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
>>     return CT_DPIF_TP_ATTR_MAX;
>> }
>>
>>-static void
>>-conn_update_expiration__(struct conntrack *ct, struct conn *conn,
>>-                         enum ct_timeout tm, long long now,
>>-                         uint32_t tp_value)
>>-    OVS_REQUIRES(conn->lock)
>>-{
>>-    ovs_mutex_unlock(&conn->lock);
>>-
>>-    ovs_mutex_lock(&ct->ct_lock);
>>-    ovs_mutex_lock(&conn->lock);
>>-    if (!conn->cleaned) {
>>-        conn->expiration = now + tp_value * 1000;
>>-        ovs_list_remove(&conn->exp_node);
>>-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>>-    }
>>-    ovs_mutex_unlock(&conn->lock);
>>-    ovs_mutex_unlock(&ct->ct_lock);
>>-
>>-    ovs_mutex_lock(&conn->lock);
>>-}
>>-
>> /* The conn entry lock must be held on entry and exit. */
>> void
>> conn_update_expiration(struct conntrack *ct, struct conn *conn,
>>@@ -266,41 +245,22 @@ conn_update_expiration(struct conntrack *ct, struct conn *conn,
>>     struct timeout_policy *tp;
>>     uint32_t val;
>>
>>-    ovs_mutex_unlock(&conn->lock);
>>-
>>-    ovs_mutex_lock(&ct->ct_lock);
>>-    ovs_mutex_lock(&conn->lock);
>>     tp = timeout_policy_lookup(ct, conn->tp_id);
>>     if (tp) {
>>         val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
>>     } else {
>>         val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
>>     }
>>-    ovs_mutex_unlock(&conn->lock);
>>-    ovs_mutex_unlock(&ct->ct_lock);
>>-
>>-    ovs_mutex_lock(&conn->lock);
>>     VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
>>                 "val=%u sec.",
>>                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
>>
>>-    conn_update_expiration__(ct, conn, tm, now, val);
>>+    conn->expiration = now + val * 1000;
>> }
>>
>>-static void
>>-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
>>-                       enum ct_timeout tm, long long now,
>>-                       uint32_t tp_value)
>>-{
>>-    conn->expiration = now + tp_value * 1000;
>>-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
>>-}
>>-
>>-/* ct_lock must be held. */
>> void
>> conn_init_expiration(struct conntrack *ct, struct conn *conn,
>>                      enum ct_timeout tm, long long now)
>>-    OVS_REQUIRES(ct->ct_lock)
>> {
>>     struct timeout_policy *tp;
>>     uint32_t val;
>>@@ -315,5 +275,5 @@ conn_init_expiration(struct conntrack *ct, struct conn *conn,
>>     VLOG_DBG_RL(&rl, "Init timeout %s zone=%u with policy id=%d val=%u sec.",
>>                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
>>
>>-    conn_init_expiration__(ct, conn, tm, now, val);
>>+    conn->expiration = now + val * 1000;
>> }
>>diff --git a/lib/conntrack.c b/lib/conntrack.c
>>index 38ddc9b91..819c356c1 100644
>>--- a/lib/conntrack.c
>>+++ b/lib/conntrack.c
>>@@ -39,12 +39,12 @@
>> #include "ovs-thread.h"
>> #include "openvswitch/poll-loop.h"
>> #include "random.h"
>>+#include "rculist.h"
>> #include "timeval.h"
>>
>> VLOG_DEFINE_THIS_MODULE(conntrack);
>>
>> COVERAGE_DEFINE(conntrack_full);
>>-COVERAGE_DEFINE(conntrack_long_cleanup);
>> COVERAGE_DEFINE(conntrack_l3csum_err);
>> COVERAGE_DEFINE(conntrack_l4csum_err);
>> COVERAGE_DEFINE(conntrack_lookup_natted_miss);
>>@@ -94,9 +94,8 @@ static bool valid_new(struct dp_packet *pkt, struct conn_key *);
>> static struct conn *new_conn(struct conntrack *ct, struct dp_packet *pkt,
>>                              struct conn_key *, long long now,
>>                              uint32_t tp_id);
>>-static void delete_conn_cmn(struct conn *);
>>+static void delete_conn__(struct conn *);
>> static void delete_conn(struct conn *);
>>-static void delete_conn_one(struct conn *conn);
>> static enum ct_update_res conn_update(struct conntrack *ct, struct conn *conn,
>>                                       struct dp_packet *pkt,
>>                                       struct conn_lookup_ctx *ctx,
>>@@ -309,7 +308,7 @@ conntrack_init(void)
>>     ovs_mutex_lock(&ct->ct_lock);
>>     cmap_init(&ct->conns);
>>     for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
>>-        ovs_list_init(&ct->exp_lists[i]);
>>+        rculist_init(&ct->exp_lists[i]);
>>     }
>>     cmap_init(&ct->zone_limits);
>>     ct->zone_limit_seq = 0;
>>@@ -319,6 +318,7 @@ conntrack_init(void)
>>     atomic_count_init(&ct->n_conn, 0);
>>     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>>     atomic_init(&ct->tcp_seq_chk, true);
>>+    atomic_init(&ct->ct_next_list, 0);
>>     latch_init(&ct->clean_thread_exit);
>>     ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
>>     ct->ipf = ipf_init();
>>@@ -467,7 +467,7 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
>> }
>>
>> static void
>>-conn_clean_cmn(struct conntrack *ct, struct conn *conn)
>>+conn_clean__(struct conntrack *ct, struct conn *conn)
>>     OVS_REQUIRES(ct->ct_lock)
>> {
>>     if (conn->alg) {
>>@@ -487,32 +487,34 @@ conn_clean_cmn(struct conntrack *ct, struct conn *conn)
>>  * removes the associated nat 'conn' from the lookup datastructures. */
>> static void
>> conn_clean(struct conntrack *ct, struct conn *conn)
>>-    OVS_REQUIRES(ct->ct_lock)
>>+    OVS_EXCLUDED(conn->lock, ct->ct_lock)
>> {
>>     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
>>
>>-    conn_clean_cmn(ct, conn);
>>+    if (atomic_flag_test_and_set(&conn->reclaimed)) {
>>+        return;
>>+    }
>>+
>>+    ovs_mutex_lock(&ct->ct_lock);
>>+    conn_clean__(ct, conn);
>>     if (conn->nat_conn) {
>>         uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
>>         cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
>>     }
>>-    ovs_list_remove(&conn->exp_node);
>>-    conn->cleaned = true;
>>+
>>+    rculist_remove(&conn->node);
>>+    ovs_mutex_unlock(&ct->ct_lock);
>>+
>>     ovsrcu_postpone(delete_conn, conn);
>>     atomic_count_dec(&ct->n_conn);
>> }
>>
>> static void
>>-conn_clean_one(struct conntrack *ct, struct conn *conn)
>>-    OVS_REQUIRES(ct->ct_lock)
>>+conn_force_expire(struct conn *conn)
>> {
>>-    conn_clean_cmn(ct, conn);
>>-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>>-        ovs_list_remove(&conn->exp_node);
>>-        conn->cleaned = true;
>>-        atomic_count_dec(&ct->n_conn);
>>-    }
>>-    ovsrcu_postpone(delete_conn_one, conn);
>>+    ovs_mutex_lock(&conn->lock);
>>+    conn->expiration = 0;
>>+    ovs_mutex_unlock(&conn->lock);
>> }
>>
>> /* Destroys the connection tracker 'ct' and frees all the allocated memory.
>>@@ -522,15 +524,16 @@ void
>> conntrack_destroy(struct conntrack *ct)
>> {
>>     struct conn *conn;
>>+
>>     latch_set(&ct->clean_thread_exit);
>>     pthread_join(ct->clean_thread, NULL);
>>     latch_destroy(&ct->clean_thread_exit);
>>
>>-    ovs_mutex_lock(&ct->ct_lock);
>>-    CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
>>-        conn_clean_one(ct, conn);
>>+    for (unsigned i = 0; i < EXP_LISTS; i++) {
>>+        RCULIST_FOR_EACH (conn, node, &ct->exp_lists[i]) {
>>+            conn_clean(ct, conn);
>>+        }
>>     }
>>-    cmap_destroy(&ct->conns);
>>
>>     struct zone_limit *zl;
>>     CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
>>@@ -539,7 +542,6 @@ conntrack_destroy(struct conntrack *ct)
>>         cmap_remove(&ct->zone_limits, &zl->node, hash);
>>         ovsrcu_postpone(free, zl);
>>     }
>>-    cmap_destroy(&ct->zone_limits);
>>
>>     struct timeout_policy *tp;
>>     CMAP_FOR_EACH (tp, node, &ct->timeout_policies) {
>>@@ -548,6 +550,11 @@ conntrack_destroy(struct conntrack *ct)
>>         cmap_remove(&ct->timeout_policies, &tp->node, hash);
>>         ovsrcu_postpone(free, tp);
>>     }
>>+
>>+    ovs_mutex_lock(&ct->ct_lock);
>>+
>>+    cmap_destroy(&ct->conns);
>>+    cmap_destroy(&ct->zone_limits);
>>     cmap_destroy(&ct->timeout_policies);
>>
>>     ovs_mutex_unlock(&ct->ct_lock);
>>@@ -1087,7 +1094,9 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>         nc->nat_conn = nat_conn;
>>         ovs_mutex_init_adaptive(&nc->lock);
>>         nc->conn_type = CT_CONN_TYPE_DEFAULT;
>>+        atomic_flag_clear(&nc->reclaimed);
>>         cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
>>+        conn_expire_push_front(ct, nc);
>>         atomic_count_inc(&ct->n_conn);
>>         ctx->conn = nc; /* For completeness. */
>>         if (zl) {
>>@@ -1108,8 +1117,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>      * can limit DoS impact. */
>> nat_res_exhaustion:
>>     free(nat_conn);
>>-    ovs_list_remove(&nc->exp_node);
>>-    delete_conn_cmn(nc);
>>+    delete_conn__(nc);
>>     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>>     VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
>>                  "if DoS attack, use firewalling and/or zone partitioning.");
>>@@ -1148,11 +1156,9 @@ conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
>>             pkt->md.ct_state = CS_INVALID;
>>             break;
>>         case CT_UPDATE_NEW:
>>-            ovs_mutex_lock(&ct->ct_lock);
>>             if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
>>-                conn_clean(ct, conn);
>>+                conn_force_expire(conn);
>>             }
>>-            ovs_mutex_unlock(&ct->ct_lock);
>>             create_new_conn = true;
>>             break;
>>         case CT_UPDATE_VALID_NEW:
>>@@ -1363,11 +1369,9 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>
>>     /* Delete found entry if in wrong direction. 'force' implies commit. */
>>     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
>>-        ovs_mutex_lock(&ct->ct_lock);
>>         if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
>>-            conn_clean(ct, conn);
>>+            conn_force_expire(conn);
>>         }
>>-        ovs_mutex_unlock(&ct->ct_lock);
>>         conn = NULL;
>>     }
>>
>>@@ -1553,39 +1557,21 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>>  * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
>>  * if 'limit' is reached */
>> static long long
>>-ct_sweep(struct conntrack *ct, long long now, size_t limit)
>>+ct_sweep(struct conntrack *ct, struct rculist *list, long long now)
>>+    OVS_NO_THREAD_SAFETY_ANALYSIS
>> {
>>     struct conn *conn;
>>-    long long min_expiration = LLONG_MAX;
>>     size_t count = 0;
>>
>>-    ovs_mutex_lock(&ct->ct_lock);
>>-
>>-    for (unsigned i = 0; i < N_CT_TM; i++) {
>>-        LIST_FOR_EACH_SAFE (conn, exp_node, &ct->exp_lists[i]) {
>>-            ovs_mutex_lock(&conn->lock);
>>-            if (now < conn->expiration || count >= limit) {
>>-                min_expiration = MIN(min_expiration, conn->expiration);
>>-                ovs_mutex_unlock(&conn->lock);
>>-                if (count >= limit) {
>>-                    /* Do not check other lists. */
>>-                    COVERAGE_INC(conntrack_long_cleanup);
>>-                    goto out;
>>-                }
>>-                break;
>>-            } else {
>>-                ovs_mutex_unlock(&conn->lock);
>>-                conn_clean(ct, conn);
>>-            }
>>-            count++;
>>+    RCULIST_FOR_EACH (conn, node, list) {
>>+        if (conn_expired(conn, now)) {
>>+            conn_clean(ct, conn);
>>         }
>>+
>>+        count++;
>>     }
>>
>>-out:
>>-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
>>-             time_msec() - now);
>>-    ovs_mutex_unlock(&ct->ct_lock);
>>-    return min_expiration;
>>+    return count;
>> }
>>
>> /* Cleans up old connection entries from 'ct'.  Returns the time when the
>>@@ -1595,11 +1581,26 @@ out:
>> static long long
>> conntrack_clean(struct conntrack *ct, long long now)
>> {
>>-    unsigned int n_conn_limit;
>>+    long long next_wakeup = now + 30 * 1000;
>>+    unsigned int n_conn_limit, i, count = 0;
>>+    size_t clean_end;
>>+
>>     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
>>-    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
>>-    long long min_exp = ct_sweep(ct, now, clean_max);
>>-    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
>>+    clean_end = n_conn_limit / 64;
>>+
>>+    for (i = ct->next_sweep; i < EXP_LISTS; i++) {
>>+        count += ct_sweep(ct, &ct->exp_lists[i], now);
>>+
>>+        if (count > clean_end) {
>>+            next_wakeup = 0;
>>+            break;
>>+        }
>>+    }
>>+
>>+    ct->next_sweep = (i < EXP_LISTS) ? i : 0;
>>+
>>+    VLOG_DBG("conntrack cleanup %"PRIu32" entries in %lld msec", count,
>>+             time_msec() - now);
>>
>>     return next_wakeup;
>> }
>>@@ -1628,6 +1629,7 @@ conntrack_clean(struct conntrack *ct, long long now)
>>
>> static void *
>> clean_thread_main(void *f_)
>>+    OVS_NO_THREAD_SAFETY_ANALYSIS
>> {
>>     struct conntrack *ct = f_;
>>
>>@@ -2539,7 +2541,7 @@ new_conn(struct conntrack *ct, struct dp_packet *pkt, struct conn_key *key,
>> }
>>
>> static void
>>-delete_conn_cmn(struct conn *conn)
>>+delete_conn__(struct conn *conn)
>> {
>>     free(conn->alg);
>>     free(conn);
>>@@ -2551,18 +2553,9 @@ delete_conn(struct conn *conn)
>>     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
>>     ovs_mutex_destroy(&conn->lock);
>>     free(conn->nat_conn);
>>-    delete_conn_cmn(conn);
>>+    delete_conn__(conn);
>> }
>>
>>-/* Only used by conn_clean_one(). */
>>-static void
>>-delete_conn_one(struct conn *conn)
>>-{
>>-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>>-        ovs_mutex_destroy(&conn->lock);
>>-    }
>>-    delete_conn_cmn(conn);
>>-}
>>
>> /* Convert a conntrack address 'a' into an IP address 'b' based on 'dl_type'.
>>  *
>>@@ -2714,6 +2707,11 @@ conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
>>         }
>>         struct conn *conn;
>>         INIT_CONTAINER(conn, cm_node, cm_node);
>>+
>>+        if (conn_expired(conn, now)) {
>>+            continue;
>>+        }
>>+
>>         if ((!dump->filter_zone || conn->key.zone == dump->zone) &&
>>             (conn->conn_type != CT_CONN_TYPE_UN_NAT)) {
>>             conn_to_ct_dpif_entry(conn, entry, now);
>>@@ -2735,13 +2733,15 @@ conntrack_flush(struct conntrack *ct, const uint16_t *zone)
>> {
>>     struct conn *conn;
>>
>>-    ovs_mutex_lock(&ct->ct_lock);
>>     CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
>>+        if (conn->conn_type != CT_CONN_TYPE_DEFAULT) {
>>+            continue;
>>+        }
>>+
>>         if (!zone || *zone == conn->key.zone) {
>>-            conn_clean_one(ct, conn);
>>+            conn_clean(ct, conn);
>>         }
>>     }
>>-    ovs_mutex_unlock(&ct->ct_lock);
>>
>>     return 0;
>> }
>>@@ -2756,7 +2756,6 @@ conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
>>
>>     memset(&key, 0, sizeof(key));
>>     tuple_to_conn_key(tuple, zone, &key);
>>-    ovs_mutex_lock(&ct->ct_lock);
>>     conn_lookup(ct, &key, time_msec(), &conn, NULL);
>>
>>     if (conn && conn->conn_type == CT_CONN_TYPE_DEFAULT) {
>>@@ -2766,7 +2765,6 @@ conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
>>         error = ENOENT;
>>     }
>>
>>-    ovs_mutex_unlock(&ct->ct_lock);
>>     return error;
>> }
>>
>>
>>_______________________________________________
>>dev mailing list
>>dev@openvswitch.org
>>https://mail.openvswitch.org/mailman/listinfo/ovs-dev
wenxu July 4, 2022, 2:56 p.m. UTC | #3
At 2022-07-04 16:43:20, "Paolo Valerio" <pvalerio@redhat.com> wrote:
>Hello wenxu,
>
>thanks for having a look at it.
>
>wenxu  <wenx05124561@163.com> writes:
>
>> Hi Paolo,
>>
>> There are two small question.
>> First the ct_lock lock/unlock as below maybe also can be dropped with this
>> patch ?
>>
>>        ovs_mutex_lock(&ct->ct_lock);
>>         if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
>>             conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
>>                                   helper, alg_exp, ct_alg_ctl, tp_id);
>>         }    
>>         ovs_mutex_unlock(&ct->ct_lock);
>>
>
>The locked lookup/insertion should be kept, as it could lead e.g. to a

>double insertion in the case we lookup without locking.
Yes, What I mean is narrow the region of the lock. Only the insertion
need this lock.


>But you're right, in general, there should be room for improvement
>because we could probably narrow the region we lock.
>IMO, we should keep this out of this series, and maybe follow up
>later, to avoid introducing too many changes at once.
>
Paolo Valerio July 4, 2022, 6:05 p.m. UTC | #4
wenxu  <wenx05124561@163.com> writes:

> At 2022-07-04 16:43:20, "Paolo Valerio" <pvalerio@redhat.com> wrote:
>>Hello wenxu,
>>
>>thanks for having a look at it.
>>
>>wenxu  <wenx05124561@163.com> writes:
>>
>>> Hi Paolo,
>>>
>>> There are two small question.
>>> First the ct_lock lock/unlock as below maybe also can be dropped with this
>>> patch ?
>>>
>>>        ovs_mutex_lock(&ct->ct_lock);
>>>         if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
>>>             conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
>>>                                   helper, alg_exp, ct_alg_ctl, tp_id);
>>>         }
>>>         ovs_mutex_unlock(&ct->ct_lock);
>>>
>>
>>The locked lookup/insertion should be kept, as it could lead e.g. to a
>>double insertion in the case we lookup without locking.
> Yes, What I mean is narrow the region of the lock. Only the insertion
> need this lock.
>

Just to clarify what I was referring to in my previous email [1].

[1] http://patchwork.ozlabs.org/project/openvswitch/patch/9ae8ad243da85be4853b90eccc958600dace7726.1623786081.git.grive@u256.net/#2728678

>>But you're right, in general, there should be room for improvement
>>because we could probably narrow the region we lock.
>>IMO, we should keep this out of this series, and maybe follow up
>>later, to avoid introducing too many changes at once.
>>
Paolo Valerio July 10, 2022, 8:03 p.m. UTC | #5
Paolo Valerio <pvalerio@redhat.com> writes:

> From: Gaetan Rivet <grive@u256.net>
>
> This patch aims to replace the expiration lists as, due to the way
> they are used, besides being a source of contention, they have a known
> issue when used with non-default policies for different zones that
> could lead to retaining expired connections potentially for a long
> time.
>
> This patch replaces them with an array of rculist used to distribute
> all the newly created connections in order to, during the sweeping
> phase, scan them without locking, and evict the expired connections
> only locking during the actual removal.  This allows to reduce the
> contention introduced by the pushback performed at every packet
> update, also solving the issue related to zones and timeout policies.
>
> Signed-off-by: Gaetan Rivet <grive@u256.net>
> Co-authored-by: Paolo Valerio <pvalerio@redhat.com>
> Signed-off-by: Paolo Valerio <pvalerio@redhat.com>
> ---
> v6:
> - minor function renaming
> - removed conn->lock in conn_clean() as this was unneeded.
> - minor commit message rephrase
> ---

I have a small incremental diff that changes minor things:

- changes/remove some comments
- next_wake changed from 30 to 20
- removes MAX() as next_wake is always used

I'm planning to send it by the end of Monday so that if any feedback
comes can be folded into a single re-spin.

diff --git a/lib/conntrack.c b/lib/conntrack.c
index 819c356c1..66a44da2e 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -1552,11 +1552,7 @@ set_label(struct dp_packet *pkt, struct conn *conn,
 }
 
 
-/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
- * earliest expiration time among the remaining connections in 'ctb'.  Returns
- * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
- * if 'limit' is reached */
-static long long
+static size_t
 ct_sweep(struct conntrack *ct, struct rculist *list, long long now)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
@@ -1574,16 +1570,15 @@ ct_sweep(struct conntrack *ct, struct rculist *list, long long now)
     return count;
 }
 
-/* Cleans up old connection entries from 'ct'.  Returns the time when the
- * next expiration might happen.  The return value might be smaller than
- * 'now', meaning that an internal limit has been reached, and some expired
- * connections have not been deleted. */
+/* Cleans up old connection entries from 'ct'.  Returns the time
+ * when the next wake will happen. The return value might be zero,
+ * meaning that an internal limit has been reached. */
 static long long
 conntrack_clean(struct conntrack *ct, long long now)
 {
-    long long next_wakeup = now + 30 * 1000;
-    unsigned int n_conn_limit, i, count = 0;
-    size_t clean_end;
+    long long next_wakeup = now + 20 * 1000;
+    unsigned int n_conn_limit, i;
+    size_t clean_end, count = 0;
 
     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
     clean_end = n_conn_limit / 64;
@@ -1599,7 +1594,7 @@ conntrack_clean(struct conntrack *ct, long long now)
 
     ct->next_sweep = (i < EXP_LISTS) ? i : 0;
 
-    VLOG_DBG("conntrack cleanup %"PRIu32" entries in %lld msec", count,
+    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
              time_msec() - now);
 
     return next_wakeup;
@@ -1608,24 +1603,8 @@ conntrack_clean(struct conntrack *ct, long long now)
 /* Cleanup:
  *
  * We must call conntrack_clean() periodically.  conntrack_clean() return
- * value gives an hint on when the next cleanup must be done (either because
- * there is an actual connection that expires, or because a new connection
- * might be created with the minimum timeout).
- *
- * The logic below has two goals:
- *
- * - We want to reduce the number of wakeups and batch connection cleanup
- *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
- *   are coping with the current cleanup tasks, then we wait at least
- *   5 seconds to do further cleanup.
- *
- * - We don't want to keep the map locked too long, as we might prevent
- *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
- *   behind, there is at least some 200ms blocks of time when the map will be
- *   left alone, so the datapath can operate unhindered.
- */
-#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
-#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
+ * value gives an hint on when the next cleanup must be done. */
+#define CT_CLEAN_MIN_INTERVAL_MS 200
 
 static void *
 clean_thread_main(void *f_)
@@ -1639,9 +1618,9 @@ clean_thread_main(void *f_)
         next_wake = conntrack_clean(ct, now);
 
         if (next_wake < now) {
-            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
         } else {
-            poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+            poll_timer_wait_until(next_wake);
         }
         latch_wait(&ct->clean_thread_exit);
         poll_block();
diff mbox series

Patch

diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index 34c688821..676f58d83 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -29,6 +29,7 @@ 
 #include "openvswitch/list.h"
 #include "openvswitch/types.h"
 #include "packets.h"
+#include "rculist.h"
 #include "unaligned.h"
 #include "dp-packet.h"
 
@@ -86,6 +87,31 @@  struct alg_exp_node {
     bool nat_rpl_dst;
 };
 
+/* Timeouts: all the possible timeout states passed to update_expiration()
+ * are listed here. The name will be prefix by CT_TM_ and the value is in
+ * milliseconds */
+#define CT_TIMEOUTS \
+    CT_TIMEOUT(TCP_FIRST_PACKET) \
+    CT_TIMEOUT(TCP_OPENING) \
+    CT_TIMEOUT(TCP_ESTABLISHED) \
+    CT_TIMEOUT(TCP_CLOSING) \
+    CT_TIMEOUT(TCP_FIN_WAIT) \
+    CT_TIMEOUT(TCP_CLOSED) \
+    CT_TIMEOUT(OTHER_FIRST) \
+    CT_TIMEOUT(OTHER_MULTIPLE) \
+    CT_TIMEOUT(OTHER_BIDIR) \
+    CT_TIMEOUT(ICMP_FIRST) \
+    CT_TIMEOUT(ICMP_REPLY)
+
+enum ct_timeout {
+#define CT_TIMEOUT(NAME) CT_TM_##NAME,
+    CT_TIMEOUTS
+#undef CT_TIMEOUT
+    N_CT_TM
+};
+
+#define EXP_LISTS 100
+
 enum OVS_PACKED_ENUM ct_conn_type {
     CT_CONN_TYPE_DEFAULT,
     CT_CONN_TYPE_UN_NAT,
@@ -96,11 +122,16 @@  struct conn {
     struct conn_key key;
     struct conn_key rev_key;
     struct conn_key parent_key; /* Only used for orig_tuple support. */
-    struct ovs_list exp_node;
     struct cmap_node cm_node;
     uint16_t nat_action;
     char *alg;
     struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
+    atomic_flag reclaimed; /* False during the lifetime of the connection,
+                            * True as soon as a thread has started freeing
+                            * its memory. */
+
+    /* Inserted once by a PMD, then managed by the 'ct_clean' thread. */
+    struct rculist node;
 
     /* Mutable data. */
     struct ovs_mutex lock; /* Guards all mutable fields. */
@@ -116,7 +147,6 @@  struct conn {
     /* Mutable data. */
     bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
                         * control messages; true if reply direction. */
-    bool cleaned; /* True if cleaned from expiry lists. */
 
     /* Immutable data. */
     bool alg_related; /* True if alg data connection. */
@@ -132,22 +162,6 @@  enum ct_update_res {
     CT_UPDATE_VALID_NEW,
 };
 
-/* Timeouts: all the possible timeout states passed to update_expiration()
- * are listed here. The name will be prefix by CT_TM_ and the value is in
- * milliseconds */
-#define CT_TIMEOUTS \
-    CT_TIMEOUT(TCP_FIRST_PACKET) \
-    CT_TIMEOUT(TCP_OPENING) \
-    CT_TIMEOUT(TCP_ESTABLISHED) \
-    CT_TIMEOUT(TCP_CLOSING) \
-    CT_TIMEOUT(TCP_FIN_WAIT) \
-    CT_TIMEOUT(TCP_CLOSED) \
-    CT_TIMEOUT(OTHER_FIRST) \
-    CT_TIMEOUT(OTHER_MULTIPLE) \
-    CT_TIMEOUT(OTHER_BIDIR) \
-    CT_TIMEOUT(ICMP_FIRST) \
-    CT_TIMEOUT(ICMP_REPLY)
-
 #define NAT_ACTION_SNAT_ALL (NAT_ACTION_SRC | NAT_ACTION_SRC_PORT)
 #define NAT_ACTION_DNAT_ALL (NAT_ACTION_DST | NAT_ACTION_DST_PORT)
 
@@ -181,22 +195,17 @@  enum ct_ephemeral_range {
 #define FOR_EACH_PORT_IN_RANGE(curr, min, max) \
     FOR_EACH_PORT_IN_RANGE__(curr, min, max, OVS_JOIN(idx, __COUNTER__))
 
-enum ct_timeout {
-#define CT_TIMEOUT(NAME) CT_TM_##NAME,
-    CT_TIMEOUTS
-#undef CT_TIMEOUT
-    N_CT_TM
-};
-
 struct conntrack {
     struct ovs_mutex ct_lock; /* Protects 2 following fields. */
     struct cmap conns OVS_GUARDED;
-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+    struct rculist exp_lists[EXP_LISTS];
     struct cmap zone_limits OVS_GUARDED;
     struct cmap timeout_policies OVS_GUARDED;
     uint32_t hash_basis; /* Salt for hashing a connection key. */
     pthread_t clean_thread; /* Periodically cleans up connection tracker. */
     struct latch clean_thread_exit; /* To destroy the 'clean_thread'. */
+    atomic_uint ct_next_list;
+    unsigned int next_sweep;
 
     /* Counting connections. */
     atomic_count n_conn; /* Number of connections currently tracked. */
@@ -216,8 +225,8 @@  struct conntrack {
 };
 
 /* Lock acquisition order:
- *    1. 'ct_lock'
- *    2. 'conn->lock'
+ *    1. 'conn->lock'
+ *    2. 'ct_lock'
  *    3. 'resources_lock'
  */
 
@@ -237,4 +246,23 @@  struct ct_l4_proto {
                                struct ct_dpif_protoinfo *);
 };
 
+static unsigned int
+ct_next_list(struct conntrack *ct)
+{
+    unsigned int old;
+
+    atomic_add_relaxed(&ct->ct_next_list, 1u, &old);
+
+    return old % EXP_LISTS;
+}
+
+static inline void
+conn_expire_push_front(struct conntrack *ct, struct conn *conn)
+    OVS_REQUIRES(ct->ct_lock)
+{
+    unsigned int next = ct_next_list(ct);
+
+    rculist_push_front(&ct->exp_lists[next], &conn->node);
+}
+
 #endif /* conntrack-private.h */
diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
index c2245038b..7b8f9007b 100644
--- a/lib/conntrack-tp.c
+++ b/lib/conntrack-tp.c
@@ -236,27 +236,6 @@  tm_to_ct_dpif_tp(enum ct_timeout tm)
     return CT_DPIF_TP_ATTR_MAX;
 }
 
-static void
-conn_update_expiration__(struct conntrack *ct, struct conn *conn,
-                         enum ct_timeout tm, long long now,
-                         uint32_t tp_value)
-    OVS_REQUIRES(conn->lock)
-{
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
-    if (!conn->cleaned) {
-        conn->expiration = now + tp_value * 1000;
-        ovs_list_remove(&conn->exp_node);
-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-    }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
-
-    ovs_mutex_lock(&conn->lock);
-}
-
 /* The conn entry lock must be held on entry and exit. */
 void
 conn_update_expiration(struct conntrack *ct, struct conn *conn,
@@ -266,41 +245,22 @@  conn_update_expiration(struct conntrack *ct, struct conn *conn,
     struct timeout_policy *tp;
     uint32_t val;
 
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
     tp = timeout_policy_lookup(ct, conn->tp_id);
     if (tp) {
         val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
     } else {
         val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
     }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
-
-    ovs_mutex_lock(&conn->lock);
     VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
                 "val=%u sec.",
                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
 
-    conn_update_expiration__(ct, conn, tm, now, val);
+    conn->expiration = now + val * 1000;
 }
 
-static void
-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
-                       enum ct_timeout tm, long long now,
-                       uint32_t tp_value)
-{
-    conn->expiration = now + tp_value * 1000;
-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-}
-
-/* ct_lock must be held. */
 void
 conn_init_expiration(struct conntrack *ct, struct conn *conn,
                      enum ct_timeout tm, long long now)
-    OVS_REQUIRES(ct->ct_lock)
 {
     struct timeout_policy *tp;
     uint32_t val;
@@ -315,5 +275,5 @@  conn_init_expiration(struct conntrack *ct, struct conn *conn,
     VLOG_DBG_RL(&rl, "Init timeout %s zone=%u with policy id=%d val=%u sec.",
                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
 
-    conn_init_expiration__(ct, conn, tm, now, val);
+    conn->expiration = now + val * 1000;
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index 38ddc9b91..819c356c1 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -39,12 +39,12 @@ 
 #include "ovs-thread.h"
 #include "openvswitch/poll-loop.h"
 #include "random.h"
+#include "rculist.h"
 #include "timeval.h"
 
 VLOG_DEFINE_THIS_MODULE(conntrack);
 
 COVERAGE_DEFINE(conntrack_full);
-COVERAGE_DEFINE(conntrack_long_cleanup);
 COVERAGE_DEFINE(conntrack_l3csum_err);
 COVERAGE_DEFINE(conntrack_l4csum_err);
 COVERAGE_DEFINE(conntrack_lookup_natted_miss);
@@ -94,9 +94,8 @@  static bool valid_new(struct dp_packet *pkt, struct conn_key *);
 static struct conn *new_conn(struct conntrack *ct, struct dp_packet *pkt,
                              struct conn_key *, long long now,
                              uint32_t tp_id);
-static void delete_conn_cmn(struct conn *);
+static void delete_conn__(struct conn *);
 static void delete_conn(struct conn *);
-static void delete_conn_one(struct conn *conn);
 static enum ct_update_res conn_update(struct conntrack *ct, struct conn *conn,
                                       struct dp_packet *pkt,
                                       struct conn_lookup_ctx *ctx,
@@ -309,7 +308,7 @@  conntrack_init(void)
     ovs_mutex_lock(&ct->ct_lock);
     cmap_init(&ct->conns);
     for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
-        ovs_list_init(&ct->exp_lists[i]);
+        rculist_init(&ct->exp_lists[i]);
     }
     cmap_init(&ct->zone_limits);
     ct->zone_limit_seq = 0;
@@ -319,6 +318,7 @@  conntrack_init(void)
     atomic_count_init(&ct->n_conn, 0);
     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
     atomic_init(&ct->tcp_seq_chk, true);
+    atomic_init(&ct->ct_next_list, 0);
     latch_init(&ct->clean_thread_exit);
     ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
     ct->ipf = ipf_init();
@@ -467,7 +467,7 @@  zone_limit_delete(struct conntrack *ct, uint16_t zone)
 }
 
 static void
-conn_clean_cmn(struct conntrack *ct, struct conn *conn)
+conn_clean__(struct conntrack *ct, struct conn *conn)
     OVS_REQUIRES(ct->ct_lock)
 {
     if (conn->alg) {
@@ -487,32 +487,34 @@  conn_clean_cmn(struct conntrack *ct, struct conn *conn)
  * removes the associated nat 'conn' from the lookup datastructures. */
 static void
 conn_clean(struct conntrack *ct, struct conn *conn)
-    OVS_REQUIRES(ct->ct_lock)
+    OVS_EXCLUDED(conn->lock, ct->ct_lock)
 {
     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
 
-    conn_clean_cmn(ct, conn);
+    if (atomic_flag_test_and_set(&conn->reclaimed)) {
+        return;
+    }
+
+    ovs_mutex_lock(&ct->ct_lock);
+    conn_clean__(ct, conn);
     if (conn->nat_conn) {
         uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
         cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
     }
-    ovs_list_remove(&conn->exp_node);
-    conn->cleaned = true;
+
+    rculist_remove(&conn->node);
+    ovs_mutex_unlock(&ct->ct_lock);
+
     ovsrcu_postpone(delete_conn, conn);
     atomic_count_dec(&ct->n_conn);
 }
 
 static void
-conn_clean_one(struct conntrack *ct, struct conn *conn)
-    OVS_REQUIRES(ct->ct_lock)
+conn_force_expire(struct conn *conn)
 {
-    conn_clean_cmn(ct, conn);
-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-        ovs_list_remove(&conn->exp_node);
-        conn->cleaned = true;
-        atomic_count_dec(&ct->n_conn);
-    }
-    ovsrcu_postpone(delete_conn_one, conn);
+    ovs_mutex_lock(&conn->lock);
+    conn->expiration = 0;
+    ovs_mutex_unlock(&conn->lock);
 }
 
 /* Destroys the connection tracker 'ct' and frees all the allocated memory.
@@ -522,15 +524,16 @@  void
 conntrack_destroy(struct conntrack *ct)
 {
     struct conn *conn;
+
     latch_set(&ct->clean_thread_exit);
     pthread_join(ct->clean_thread, NULL);
     latch_destroy(&ct->clean_thread_exit);
 
-    ovs_mutex_lock(&ct->ct_lock);
-    CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
-        conn_clean_one(ct, conn);
+    for (unsigned i = 0; i < EXP_LISTS; i++) {
+        RCULIST_FOR_EACH (conn, node, &ct->exp_lists[i]) {
+            conn_clean(ct, conn);
+        }
     }
-    cmap_destroy(&ct->conns);
 
     struct zone_limit *zl;
     CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
@@ -539,7 +542,6 @@  conntrack_destroy(struct conntrack *ct)
         cmap_remove(&ct->zone_limits, &zl->node, hash);
         ovsrcu_postpone(free, zl);
     }
-    cmap_destroy(&ct->zone_limits);
 
     struct timeout_policy *tp;
     CMAP_FOR_EACH (tp, node, &ct->timeout_policies) {
@@ -548,6 +550,11 @@  conntrack_destroy(struct conntrack *ct)
         cmap_remove(&ct->timeout_policies, &tp->node, hash);
         ovsrcu_postpone(free, tp);
     }
+
+    ovs_mutex_lock(&ct->ct_lock);
+
+    cmap_destroy(&ct->conns);
+    cmap_destroy(&ct->zone_limits);
     cmap_destroy(&ct->timeout_policies);
 
     ovs_mutex_unlock(&ct->ct_lock);
@@ -1087,7 +1094,9 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         nc->nat_conn = nat_conn;
         ovs_mutex_init_adaptive(&nc->lock);
         nc->conn_type = CT_CONN_TYPE_DEFAULT;
+        atomic_flag_clear(&nc->reclaimed);
         cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
+        conn_expire_push_front(ct, nc);
         atomic_count_inc(&ct->n_conn);
         ctx->conn = nc; /* For completeness. */
         if (zl) {
@@ -1108,8 +1117,7 @@  conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
      * can limit DoS impact. */
 nat_res_exhaustion:
     free(nat_conn);
-    ovs_list_remove(&nc->exp_node);
-    delete_conn_cmn(nc);
+    delete_conn__(nc);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
     VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
                  "if DoS attack, use firewalling and/or zone partitioning.");
@@ -1148,11 +1156,9 @@  conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
             pkt->md.ct_state = CS_INVALID;
             break;
         case CT_UPDATE_NEW:
-            ovs_mutex_lock(&ct->ct_lock);
             if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
-                conn_clean(ct, conn);
+                conn_force_expire(conn);
             }
-            ovs_mutex_unlock(&ct->ct_lock);
             create_new_conn = true;
             break;
         case CT_UPDATE_VALID_NEW:
@@ -1363,11 +1369,9 @@  process_one(struct conntrack *ct, struct dp_packet *pkt,
 
     /* Delete found entry if in wrong direction. 'force' implies commit. */
     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
-        ovs_mutex_lock(&ct->ct_lock);
         if (conn_lookup(ct, &conn->key, now, NULL, NULL)) {
-            conn_clean(ct, conn);
+            conn_force_expire(conn);
         }
-        ovs_mutex_unlock(&ct->ct_lock);
         conn = NULL;
     }
 
@@ -1553,39 +1557,21 @@  set_label(struct dp_packet *pkt, struct conn *conn,
  * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
  * if 'limit' is reached */
 static long long
-ct_sweep(struct conntrack *ct, long long now, size_t limit)
+ct_sweep(struct conntrack *ct, struct rculist *list, long long now)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct conn *conn;
-    long long min_expiration = LLONG_MAX;
     size_t count = 0;
 
-    ovs_mutex_lock(&ct->ct_lock);
-
-    for (unsigned i = 0; i < N_CT_TM; i++) {
-        LIST_FOR_EACH_SAFE (conn, exp_node, &ct->exp_lists[i]) {
-            ovs_mutex_lock(&conn->lock);
-            if (now < conn->expiration || count >= limit) {
-                min_expiration = MIN(min_expiration, conn->expiration);
-                ovs_mutex_unlock(&conn->lock);
-                if (count >= limit) {
-                    /* Do not check other lists. */
-                    COVERAGE_INC(conntrack_long_cleanup);
-                    goto out;
-                }
-                break;
-            } else {
-                ovs_mutex_unlock(&conn->lock);
-                conn_clean(ct, conn);
-            }
-            count++;
+    RCULIST_FOR_EACH (conn, node, list) {
+        if (conn_expired(conn, now)) {
+            conn_clean(ct, conn);
         }
+
+        count++;
     }
 
-out:
-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
-             time_msec() - now);
-    ovs_mutex_unlock(&ct->ct_lock);
-    return min_expiration;
+    return count;
 }
 
 /* Cleans up old connection entries from 'ct'.  Returns the time when the
@@ -1595,11 +1581,26 @@  out:
 static long long
 conntrack_clean(struct conntrack *ct, long long now)
 {
-    unsigned int n_conn_limit;
+    long long next_wakeup = now + 30 * 1000;
+    unsigned int n_conn_limit, i, count = 0;
+    size_t clean_end;
+
     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
-    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
-    long long min_exp = ct_sweep(ct, now, clean_max);
-    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
+    clean_end = n_conn_limit / 64;
+
+    for (i = ct->next_sweep; i < EXP_LISTS; i++) {
+        count += ct_sweep(ct, &ct->exp_lists[i], now);
+
+        if (count > clean_end) {
+            next_wakeup = 0;
+            break;
+        }
+    }
+
+    ct->next_sweep = (i < EXP_LISTS) ? i : 0;
+
+    VLOG_DBG("conntrack cleanup %"PRIu32" entries in %lld msec", count,
+             time_msec() - now);
 
     return next_wakeup;
 }
@@ -1628,6 +1629,7 @@  conntrack_clean(struct conntrack *ct, long long now)
 
 static void *
 clean_thread_main(void *f_)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct conntrack *ct = f_;
 
@@ -2539,7 +2541,7 @@  new_conn(struct conntrack *ct, struct dp_packet *pkt, struct conn_key *key,
 }
 
 static void
-delete_conn_cmn(struct conn *conn)
+delete_conn__(struct conn *conn)
 {
     free(conn->alg);
     free(conn);
@@ -2551,18 +2553,9 @@  delete_conn(struct conn *conn)
     ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
     ovs_mutex_destroy(&conn->lock);
     free(conn->nat_conn);
-    delete_conn_cmn(conn);
+    delete_conn__(conn);
 }
 
-/* Only used by conn_clean_one(). */
-static void
-delete_conn_one(struct conn *conn)
-{
-    if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-        ovs_mutex_destroy(&conn->lock);
-    }
-    delete_conn_cmn(conn);
-}
 
 /* Convert a conntrack address 'a' into an IP address 'b' based on 'dl_type'.
  *
@@ -2714,6 +2707,11 @@  conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
         }
         struct conn *conn;
         INIT_CONTAINER(conn, cm_node, cm_node);
+
+        if (conn_expired(conn, now)) {
+            continue;
+        }
+
         if ((!dump->filter_zone || conn->key.zone == dump->zone) &&
             (conn->conn_type != CT_CONN_TYPE_UN_NAT)) {
             conn_to_ct_dpif_entry(conn, entry, now);
@@ -2735,13 +2733,15 @@  conntrack_flush(struct conntrack *ct, const uint16_t *zone)
 {
     struct conn *conn;
 
-    ovs_mutex_lock(&ct->ct_lock);
     CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
+        if (conn->conn_type != CT_CONN_TYPE_DEFAULT) {
+            continue;
+        }
+
         if (!zone || *zone == conn->key.zone) {
-            conn_clean_one(ct, conn);
+            conn_clean(ct, conn);
         }
     }
-    ovs_mutex_unlock(&ct->ct_lock);
 
     return 0;
 }
@@ -2756,7 +2756,6 @@  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
 
     memset(&key, 0, sizeof(key));
     tuple_to_conn_key(tuple, zone, &key);
-    ovs_mutex_lock(&ct->ct_lock);
     conn_lookup(ct, &key, time_msec(), &conn, NULL);
 
     if (conn && conn->conn_type == CT_CONN_TYPE_DEFAULT) {
@@ -2766,7 +2765,6 @@  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
         error = ENOENT;
     }
 
-    ovs_mutex_unlock(&ct->ct_lock);
     return error;
 }