@@ -29,6 +29,7 @@
#include "openvswitch/list.h"
#include "openvswitch/types.h"
#include "packets.h"
+#include "mpsc-queue.h"
#include "unaligned.h"
#include "dp-packet.h"
@@ -86,22 +87,57 @@ struct alg_exp_node {
bool nat_rpl_dst;
};
+/* Timeouts: all the possible timeout states passed to update_expiration()
+ * are listed here. The name will be prefix by CT_TM_ and the value is in
+ * milliseconds */
+#define CT_TIMEOUTS \
+ CT_TIMEOUT(TCP_FIRST_PACKET) \
+ CT_TIMEOUT(TCP_OPENING) \
+ CT_TIMEOUT(TCP_ESTABLISHED) \
+ CT_TIMEOUT(TCP_CLOSING) \
+ CT_TIMEOUT(TCP_FIN_WAIT) \
+ CT_TIMEOUT(TCP_CLOSED) \
+ CT_TIMEOUT(OTHER_FIRST) \
+ CT_TIMEOUT(OTHER_MULTIPLE) \
+ CT_TIMEOUT(OTHER_BIDIR) \
+ CT_TIMEOUT(ICMP_FIRST) \
+ CT_TIMEOUT(ICMP_REPLY)
+
+enum ct_timeout {
+#define CT_TIMEOUT(NAME) CT_TM_##NAME,
+ CT_TIMEOUTS
+#undef CT_TIMEOUT
+ N_CT_TM
+};
+
enum OVS_PACKED_ENUM ct_conn_type {
CT_CONN_TYPE_DEFAULT,
CT_CONN_TYPE_UN_NAT,
};
+struct conn_expire {
+ struct mpsc_queue_node node;
+ /* Timeout state of the connection.
+ * It follows the connection state updates.
+ */
+ enum ct_timeout tm;
+ atomic_flag reschedule;
+ struct ovs_refcount refcount;
+};
+
struct conn {
/* Immutable data. */
struct conn_key key;
struct conn_key rev_key;
struct conn_key parent_key; /* Only used for orig_tuple support. */
- struct ovs_list exp_node;
struct cmap_node cm_node;
struct nat_action_info_t *nat_info;
char *alg;
struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
+ /* Inserted once by a PMD, then managed by the 'ct_clean' thread. */
+ struct conn_expire exp;
+
/* Mutable data. */
struct ovs_mutex lock; /* Guards all mutable fields. */
ovs_u128 label;
@@ -132,33 +168,10 @@ enum ct_update_res {
CT_UPDATE_VALID_NEW,
};
-/* Timeouts: all the possible timeout states passed to update_expiration()
- * are listed here. The name will be prefix by CT_TM_ and the value is in
- * milliseconds */
-#define CT_TIMEOUTS \
- CT_TIMEOUT(TCP_FIRST_PACKET) \
- CT_TIMEOUT(TCP_OPENING) \
- CT_TIMEOUT(TCP_ESTABLISHED) \
- CT_TIMEOUT(TCP_CLOSING) \
- CT_TIMEOUT(TCP_FIN_WAIT) \
- CT_TIMEOUT(TCP_CLOSED) \
- CT_TIMEOUT(OTHER_FIRST) \
- CT_TIMEOUT(OTHER_MULTIPLE) \
- CT_TIMEOUT(OTHER_BIDIR) \
- CT_TIMEOUT(ICMP_FIRST) \
- CT_TIMEOUT(ICMP_REPLY)
-
-enum ct_timeout {
-#define CT_TIMEOUT(NAME) CT_TM_##NAME,
- CT_TIMEOUTS
-#undef CT_TIMEOUT
- N_CT_TM
-};
-
struct conntrack {
struct ovs_mutex ct_lock; /* Protects 2 following fields. */
struct cmap conns OVS_GUARDED;
- struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+ struct mpsc_queue exp_lists[N_CT_TM];
struct hmap zone_limits OVS_GUARDED;
struct hmap timeout_policies OVS_GUARDED;
uint32_t hash_basis; /* Salt for hashing a connection key. */
@@ -204,4 +217,25 @@ struct ct_l4_proto {
struct ct_dpif_protoinfo *);
};
+static inline void
+conn_expire_append(struct conntrack *ct, struct conn *conn)
+{
+ if (ovs_refcount_try_ref_rcu(&conn->exp.refcount)) {
+ atomic_flag_clear(&conn->exp.reschedule);
+ mpsc_queue_insert(&ct->exp_lists[conn->exp.tm], &conn->exp.node);
+ }
+}
+
+static inline void
+conn_expire_prepend(struct conntrack *ct, struct conn *conn)
+ OVS_REQUIRES(ct->exp_lists[conn->exp.tm].read_lock)
+{
+ if (ovs_refcount_try_ref_rcu(&conn->exp.refcount)) {
+ /* Do not change 'reschedule' state, if this expire node is put
+ * at the tail of the list, it will be re-examined next sweep.
+ */
+ mpsc_queue_push_back(&ct->exp_lists[conn->exp.tm], &conn->exp.node);
+ }
+}
+
#endif /* conntrack-private.h */
@@ -230,6 +230,15 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
return CT_DPIF_TP_ATTR_MAX;
}
+static void
+conn_schedule_expiration(struct conn *conn, enum ct_timeout tm, long long now,
+ uint32_t tp_value)
+{
+ conn->expiration = now + tp_value * 1000;
+ conn->exp.tm = tm;
+ ignore(atomic_flag_test_and_set(&conn->exp.reschedule));
+}
+
static void
conn_update_expiration__(struct conntrack *ct, struct conn *conn,
enum ct_timeout tm, long long now,
@@ -240,11 +249,7 @@ conn_update_expiration__(struct conntrack *ct, struct conn *conn,
ovs_mutex_lock(&ct->ct_lock);
ovs_mutex_lock(&conn->lock);
- if (!conn->cleaned) {
- conn->expiration = now + tp_value * 1000;
- ovs_list_remove(&conn->exp_node);
- ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
- }
+ conn_schedule_expiration(conn, tm, now, tp_value);
ovs_mutex_unlock(&conn->lock);
ovs_mutex_unlock(&ct->ct_lock);
@@ -281,15 +286,6 @@ conn_update_expiration(struct conntrack *ct, struct conn *conn,
conn_update_expiration__(ct, conn, tm, now, val);
}
-static void
-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
- enum ct_timeout tm, long long now,
- uint32_t tp_value)
-{
- conn->expiration = now + tp_value * 1000;
- ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-}
-
/* ct_lock must be held. */
void
conn_init_expiration(struct conntrack *ct, struct conn *conn,
@@ -309,5 +305,7 @@ conn_init_expiration(struct conntrack *ct, struct conn *conn,
VLOG_DBG_RL(&rl, "Init timeout %s zone=%u with policy id=%d val=%u sec.",
ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
- conn_init_expiration__(ct, conn, tm, now, val);
+ atomic_flag_clear(&conn->exp.reschedule);
+ ovs_refcount_init(&conn->exp.refcount);
+ conn_schedule_expiration(conn, tm, now, val);
}
@@ -301,7 +301,7 @@ conntrack_init(void)
ovs_mutex_lock(&ct->ct_lock);
cmap_init(&ct->conns);
for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
- ovs_list_init(&ct->exp_lists[i]);
+ mpsc_queue_init(&ct->exp_lists[i]);
}
hmap_init(&ct->zone_limits);
ct->zone_limit_seq = 0;
@@ -453,6 +453,17 @@ conn_clean_cmn(struct conntrack *ct, struct conn *conn)
}
}
+static inline bool
+conn_unref(struct conn *conn)
+{
+ ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
+ if (ovs_refcount_unref(&conn->exp.refcount) == 1) {
+ ovsrcu_postpone(delete_conn, conn);
+ return true;
+ }
+ return false;
+}
+
/* Must be called with 'conn' of 'conn_type' CT_CONN_TYPE_DEFAULT. Also
* removes the associated nat 'conn' from the lookup datastructures. */
static void
@@ -466,23 +477,31 @@ conn_clean(struct conntrack *ct, struct conn *conn)
uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
}
- ovs_list_remove(&conn->exp_node);
conn->cleaned = true;
- ovsrcu_postpone(delete_conn, conn);
+ conn_unref(conn);
atomic_count_dec(&ct->n_conn);
}
+static inline bool
+conn_unref_one(struct conn *conn)
+{
+ if (ovs_refcount_unref(&conn->exp.refcount) == 1) {
+ ovsrcu_postpone(delete_conn_one, conn);
+ return true;
+ }
+ return false;
+}
+
static void
conn_clean_one(struct conntrack *ct, struct conn *conn)
OVS_REQUIRES(ct->ct_lock)
{
conn_clean_cmn(ct, conn);
if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
- ovs_list_remove(&conn->exp_node);
conn->cleaned = true;
atomic_count_dec(&ct->n_conn);
}
- ovsrcu_postpone(delete_conn_one, conn);
+ conn_unref_one(conn);
}
/* Destroys the connection tracker 'ct' and frees all the allocated memory.
@@ -497,6 +516,19 @@ conntrack_destroy(struct conntrack *ct)
latch_destroy(&ct->clean_thread_exit);
ovs_mutex_lock(&ct->ct_lock);
+
+ for (unsigned i = 0; i < N_CT_TM; i++) {
+ struct mpsc_queue_node *node;
+
+ mpsc_queue_acquire(&ct->exp_lists[i]);
+ MPSC_QUEUE_FOR_EACH_POP (node, &ct->exp_lists[i]) {
+ conn = CONTAINER_OF(node, struct conn, exp.node);
+ conn_unref(conn);
+ }
+ mpsc_queue_release(&ct->exp_lists[i]);
+ mpsc_queue_destroy(&ct->exp_lists[i]);
+ }
+
CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
conn_clean_one(ct, conn);
}
@@ -1055,6 +1087,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
ovs_mutex_init_adaptive(&nc->lock);
nc->conn_type = CT_CONN_TYPE_DEFAULT;
cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
+ conn_expire_append(ct, nc);
atomic_count_inc(&ct->n_conn);
ctx->conn = nc; /* For completeness. */
if (zl) {
@@ -1075,7 +1108,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
* can limit DoS impact. */
nat_res_exhaustion:
free(nat_conn);
- ovs_list_remove(&nc->exp_node);
delete_conn_cmn(nc);
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
@@ -1492,29 +1524,72 @@ set_label(struct dp_packet *pkt, struct conn *conn,
* if 'limit' is reached */
static long long
ct_sweep(struct conntrack *ct, long long now, size_t limit)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
{
- struct conn *conn, *next;
+ struct conn *conn;
long long min_expiration = LLONG_MAX;
+ struct mpsc_queue_node *node;
size_t count = 0;
ovs_mutex_lock(&ct->ct_lock);
for (unsigned i = 0; i < N_CT_TM; i++) {
- LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
+ struct conn *end_of_queue = NULL;
+
+ MPSC_QUEUE_FOR_EACH_POP (node, &ct->exp_lists[i]) {
+ long long int expiration;
+
+ conn = CONTAINER_OF(node, struct conn, exp.node);
+ if (conn_unref(conn)) {
+ continue;
+ }
+
ovs_mutex_lock(&conn->lock);
- if (now < conn->expiration || count >= limit) {
- min_expiration = MIN(min_expiration, conn->expiration);
- ovs_mutex_unlock(&conn->lock);
- if (count >= limit) {
- /* Do not check other lists. */
- COVERAGE_INC(conntrack_long_cleanup);
- goto out;
- }
+ expiration = conn->expiration;
+ ovs_mutex_unlock(&conn->lock);
+
+ if (conn == end_of_queue) {
+ /* If we already re-enqueued this conn during this sweep,
+ * stop iterating this list and skip to the next.
+ */
+ min_expiration = MIN(min_expiration, expiration);
+ conn_expire_prepend(ct, conn);
break;
+ }
+
+ if (count >= limit) {
+ min_expiration = MIN(min_expiration, expiration);
+ conn_expire_prepend(ct, conn);
+ COVERAGE_INC(conntrack_long_cleanup);
+ /* Do not check other lists. */
+ goto out;
+ }
+
+ if (now < expiration) {
+ if (atomic_flag_test_and_set(&conn->exp.reschedule)) {
+ /* Reschedule was true, another thread marked
+ * this conn to be enqueued again.
+ * The conn is not yet expired, still valid, and
+ * this list should still be iterated.
+ */
+ conn_expire_append(ct, conn);
+ if (end_of_queue == NULL) {
+ end_of_queue = conn;
+ }
+ } else {
+ /* This connection is still valid, while no other thread
+ * modified it: it means this list iteration is finished
+ * for now. Put back the connection within the list.
+ */
+ atomic_flag_clear(&conn->exp.reschedule);
+ conn_expire_prepend(ct, conn);
+ min_expiration = MIN(min_expiration, expiration);
+ break;
+ }
} else {
- ovs_mutex_unlock(&conn->lock);
conn_clean(ct, conn);
}
+
count++;
}
}
@@ -1566,9 +1641,14 @@ conntrack_clean(struct conntrack *ct, long long now)
static void *
clean_thread_main(void *f_)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct conntrack *ct = f_;
+ for (unsigned i = 0; i < N_CT_TM; i++) {
+ mpsc_queue_acquire(&ct->exp_lists[i]);
+ }
+
while (!latch_is_set(&ct->clean_thread_exit)) {
long long next_wake;
long long now = time_msec();
@@ -1583,6 +1663,10 @@ clean_thread_main(void *f_)
poll_block();
}
+ for (unsigned i = 0; i < N_CT_TM; i++) {
+ mpsc_queue_release(&ct->exp_lists[i]);
+ }
+
return NULL;
}