On 10 June 2016 at 15:47, Daniele Di Proietto <diproiet...@vmware.com> wrote: > This commit adds a thread that periodically removes expired connections. > > The expiration time of a connection can be expressed by: > > expiration = now + timeout > > For each possible 'timeout' value (there aren't many) we keep a list. > When the expiration is updated, we move the connection to the back of the > corresponding 'timeout' list. This ways, the list is always ordered by > 'expiration'. > > When the cleanup thread iterates through the lists for expired > connections, it can stop at the first non expired connection. > > Suggested-by: Joe Stringer <j...@ovn.org> > Signed-off-by: Daniele Di Proietto <diproiet...@vmware.com> > --- > lib/conntrack-other.c | 11 +-- > lib/conntrack-private.h | 21 ++++-- > lib/conntrack-tcp.c | 20 +++--- > lib/conntrack.c | 184 > ++++++++++++++++++++++++++++++++++++++++++++---- > lib/conntrack.h | 32 ++++++++- > 5 files changed, 237 insertions(+), 31 deletions(-) > > diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c > index 295cb2c..2920889 100644 > --- a/lib/conntrack-other.c > +++ b/lib/conntrack-other.c > @@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn) > } > > static enum ct_update_res > -other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED, > - bool reply, long long now) > +other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, > + struct dp_packet *pkt OVS_UNUSED, bool reply, long long > now) > { > struct conn_other *conn = conn_other_cast(conn_); > > @@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt > OVS_UNUSED, > conn->state = OTHERS_MULTIPLE; > } > > - update_expiration(conn_, other_timeouts[conn->state], now); > + conn_update_expiration(ctb, &conn->up, othler_timeouts[conn->state], > now); > > return CT_UPDATE_VALID; > } > @@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED) > } > > static struct conn * > -other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now) > +other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt > OVS_UNUSED, > + long long now) > { > struct conn_other *conn; > > conn = xzalloc(sizeof *conn); > conn->state = OTHERS_FIRST; > > - update_expiration(&conn->up, other_timeouts[conn->state], now); > + conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now); > > return &conn->up; > } > diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h > index d3e0099..4743dc6 100644 > --- a/lib/conntrack-private.h > +++ b/lib/conntrack-private.h > @@ -68,10 +68,13 @@ enum ct_update_res { > }; > > struct ct_l4_proto { > - struct conn *(*new_conn)(struct dp_packet *pkt, long long now); > + struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet > *pkt, > + long long now); > bool (*valid_new)(struct dp_packet *pkt); > - enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet > *pkt, > - bool reply, long long now); > + enum ct_update_res (*conn_update)(struct conn *conn, > + struct conntrack_bucket *, > + struct dp_packet *pkt, bool reply, > + long long now); > }; > > extern struct ct_l4_proto ct_proto_tcp; > @@ -80,9 +83,19 @@ extern struct ct_l4_proto ct_proto_other; > extern long long ct_timeout_val[]; > > static inline void > -update_expiration(struct conn *conn, enum ct_timeout tm, long long now) > +conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn, > + enum ct_timeout tm, long long now) > { > conn->expiration = now + ct_timeout_val[tm]; > + ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node); > +} > + > +static inline void > +conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn, > + enum ct_timeout tm, long long now) > +{ > + ovs_list_remove(&conn->exp_node); > + conn_init_expiration(ctb, conn, tm, now); > } > > #endif /* conntrack-private.h */ > diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c > index b574eeb..71eadc1 100644 > --- a/lib/conntrack-tcp.c > +++ b/lib/conntrack-tcp.c > @@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt) > } > > static enum ct_update_res > -tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply, > - long long now) > +tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, > + struct dp_packet *pkt, bool reply, long long now) > { > struct conn_tcp *conn = conn_tcp_cast(conn_); > struct tcp_header *tcp = dp_packet_l4(pkt); > @@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet > *pkt, bool reply, > > if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2 > && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) { > - update_expiration(conn_, CT_TM_TCP_CLOSED, now); > + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now); > } else if (src->state >= CT_DPIF_TCPS_CLOSING > && dst->state >= CT_DPIF_TCPS_CLOSING) { > - update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now); > + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now); > } else if (src->state < CT_DPIF_TCPS_ESTABLISHED > || dst->state < CT_DPIF_TCPS_ESTABLISHED) { > - update_expiration(conn_, now, CT_TM_TCP_OPENING); > + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now); > } else if (src->state >= CT_DPIF_TCPS_CLOSING > || dst->state >= CT_DPIF_TCPS_CLOSING) { > - update_expiration(conn_, now, CT_TM_TCP_CLOSING); > + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now); > } else { > - update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED); > + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, > now); > } > } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT > || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2 > @@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt) > } > > static struct conn * > -tcp_new_conn(struct dp_packet *pkt, long long now) > +tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt, > + long long now) > { > struct conn_tcp* newconn = NULL; > struct tcp_header *tcp = dp_packet_l4(pkt); > @@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now) > src->state = CT_DPIF_TCPS_SYN_SENT; > dst->state = CT_DPIF_TCPS_CLOSED; > > - update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET); > + conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET, > + now); > > return &newconn->up; > } > diff --git a/lib/conntrack.c b/lib/conntrack.c > index 96935bc..5376550 100644 > --- a/lib/conntrack.c > +++ b/lib/conntrack.c > @@ -33,6 +33,8 @@ > #include "odp-netlink.h" > #include "openvswitch/vlog.h" > #include "ovs-rcu.h" > +#include "ovs-thread.h" > +#include "poll-loop.h" > #include "random.h" > #include "timeval.h" > > @@ -56,17 +58,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb, > struct conn_lookup_ctx *ctx, > long long now); > static bool valid_new(struct dp_packet *pkt, struct conn_key *); > -static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *, > - long long now); > +static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet > *pkt, > + struct conn_key *, long long now); > static void delete_conn(struct conn *); > -static enum ct_update_res conn_update(struct conn *, struct dp_packet*, > - bool reply, long long now); > +static enum ct_update_res conn_update(struct conn *, > + struct conntrack_bucket *ctb, > + struct dp_packet *, bool reply, > + long long now); > static bool conn_expired(struct conn *, long long now); > static void set_mark(struct dp_packet *, struct conn *, > uint32_t val, uint32_t mask); > static void set_label(struct dp_packet *, struct conn *, > const struct ovs_key_ct_labels *val, > const struct ovs_key_ct_labels *mask); > +static void *clean_thread_main(void *f_); > > static struct ct_l4_proto *l4_protos[] = { > [IPPROTO_TCP] = &ct_proto_tcp, > @@ -90,7 +95,8 @@ long long ct_timeout_val[] = { > void > conntrack_init(struct conntrack *ct) > { > - unsigned i; > + unsigned i, j; > + long long now = time_msec(); > > for (i = 0; i < CONNTRACK_BUCKETS; i++) { > struct conntrack_bucket *ctb = &ct->buckets[i]; > @@ -98,11 +104,20 @@ conntrack_init(struct conntrack *ct) > ct_lock_init(&ctb->lock); > ct_lock_lock(&ctb->lock); > hmap_init(&ctb->connections); > + for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) { > + ovs_list_init(&ctb->exp_lists[j]); > + } > ct_lock_unlock(&ctb->lock); > + ovs_mutex_init(&ctb->cleanup_mutex); > + ovs_mutex_lock(&ctb->cleanup_mutex); > + ctb->next_cleanup = now + CT_TM_MIN; > + ovs_mutex_unlock(&ctb->cleanup_mutex); > } > ct->hash_basis = random_uint32(); > atomic_count_init(&ct->n_conn, 0); > atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT); > + latch_init(&ct->clean_thread_exit); > + ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct); > } > > /* Destroys the connection tracker 'ct' and frees all the allocated memory. > */ > @@ -111,10 +126,14 @@ conntrack_destroy(struct conntrack *ct) > { > unsigned i; > > + latch_set(&ct->clean_thread_exit); > + pthread_join(ct->clean_thread, NULL); > + latch_destroy(&ct->clean_thread_exit); > for (i = 0; i < CONNTRACK_BUCKETS; i++) { > struct conntrack_bucket *ctb = &ct->buckets[i]; > struct conn *conn; > > + ovs_mutex_destroy(&ctb->cleanup_mutex); > ct_lock_lock(&ctb->lock);
I'm confused, is the order to grab cleanup_mutex then ctb->lock, or the other way around? (See also conntrack_clean() if this is the wrong order) > HMAP_FOR_EACH_POP(conn, node, &ctb->connections) { > atomic_count_dec(&ct->n_conn); > @@ -170,7 +189,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet > *pkt, > return nc; > } > > - nc = new_conn(pkt, &ctx->key, now); > + nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now); > > memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key); > > @@ -200,7 +219,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, > } else { > enum ct_update_res res; > > - res = conn_update(conn, pkt, ctx->reply, now); > + res = conn_update(conn, &ct->buckets[bucket], pkt, > + ctx->reply, now); > > switch (res) { > case CT_UPDATE_VALID: > @@ -213,6 +233,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, > state |= CS_INVALID; > break; > case CT_UPDATE_NEW: > + ovs_list_remove(&conn->exp_node); > hmap_remove(&ct->buckets[bucket].connections, &conn->node); > atomic_count_dec(&ct->n_conn); > delete_conn(conn); > @@ -345,6 +366,143 @@ set_label(struct dp_packet *pkt, struct conn *conn, > conn->label = pkt->md.ct_label; > } > > +/* Delete the expired connections from 'ctb', up to 'limit'. Returns the > + * earliest expiration time among the remaining connections in 'ctb'. > Returns > + * LLONG_MAX if 'ctb' is empty. The return value might be smaller than > 'now', > + * if 'limit' is reached */ > +static long long > +sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long > now, > + size_t limit) > + OVS_REQUIRES(ctb->lock) > +{ > + struct conn *conn, *next; > + long long min_expiration = LLONG_MAX; > + unsigned i; > + size_t count = 0; > + > + for (i = 0; i < N_CT_TM; i++) { > + LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) { > + if (!conn_expired(conn, now) || count >= limit) { > + min_expiration = MIN(min_expiration, conn->expiration); > + if (count >= limit) { > + /* Do not check other lists. */ > + return min_expiration; Do you think we should have a coverage counter for this? > + } > + break; > + } > + ovs_list_remove(&conn->exp_node); > + hmap_remove(&ctb->connections, &conn->node); > + atomic_count_dec(&ct->n_conn); > + delete_conn(conn); > + count++; > + } > + } > + > + return min_expiration; > +} > + > +/* Cleans up old connection entries from 'ct'. Returns the time when the > + * next expiration might happen. The return value might be smaller than > + * 'now', meaning that an internal limit has been reached, and some expired > + * connections have not been deleted. */ > +static long long > +conntrack_clean(struct conntrack *ct, long long now) > +{ > + long long next_wakeup = now + CT_TM_MIN; > + unsigned int n_conn_limit; > + size_t clean_count = 0; > + unsigned i; > + > + atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit); > + > + for (i = 0; i < CONNTRACK_BUCKETS; i++) { > + struct conntrack_bucket *ctb = &ct->buckets[i]; > + size_t prev_count; > + long long min_exp; > + > + ovs_mutex_lock(&ctb->cleanup_mutex); > > + if (ctb->next_cleanup > now) { > + goto next_bucket; > + } > + > + ct_lock_lock(&ctb->lock); > + prev_count = hmap_count(&ctb->connections); > + /* If the connections are well distributed among buckets, we want to > + * limit to 10% of the global limit equally split among buckets. If > + * the bucket is busier than the others, we limit to 10% of its > + * current size. */ > + min_exp = sweep_bucket(ct, ctb, now, > + MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10))); > + clean_count += prev_count - hmap_count(&ctb->connections); > + > + if (min_exp > now) { > + /* We call hmap_shrink() only if sweep_bucket() managed to delete > + * every expired connection. */ > + hmap_shrink(&ctb->connections); > + } > + > + ct_lock_unlock(&ctb->lock); > + > + ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN); > + > +next_bucket: > + next_wakeup = MIN(next_wakeup, ctb->next_cleanup); > + ovs_mutex_unlock(&ctb->cleanup_mutex); > + } > + > + VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", > + clean_count, time_msec() - now); > + > + return next_wakeup; > +} > + > +/* Cleanup: > + * > + * > + * We must call conntrack_clean() periodically. conntrack_clean() return > + * value gives an hint on when the next cleanup must be done (either because > + * there is an actual connection that expires, or because a new connection > + * might be created with the minimum timeout). > + * > + * The logic below has two goals: > + * > + * - Avoid calling conntrack_clean() too often. If we call conntrack_clean() > + * each time a connection expires, the thread will consume 100% CPU, so we > + * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to > batch > + * removal. > + * > + * - On the other hand, it's not a good idea to keep the buckets locked for > + * too long, as we might prevent traffic from flowing. If > conntrack_clean() > + * returns a value which is in the past, it means that the internal limit > + * has been reached and more cleanup is required. In this case, just wait > + * CT_CLEAN_MIN_INTERVAL before the next call. > + */ > +#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */ > +#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */ > + > +static void * > +clean_thread_main(void *f_) > +{ > + struct conntrack *ct = f_; > + > + while (!latch_is_set(&ct->clean_thread_exit)) { > + long long next_wake; > + long long now = time_msec(); > + > + next_wake = conntrack_clean(ct, now); > + > + if (next_wake < now) { > + poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL); > + } else { > + poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL)); > + } > + latch_wait(&ct->clean_thread_exit); > + poll_block(); > + } Are the logs going to constantly complain that this thread sleeps for more than a second? > > + > + return NULL; > +} > + > /* Key extraction */ > > /* The function stores a pointer to the first byte after the header in > @@ -851,10 +1009,11 @@ conn_key_lookup(struct conntrack_bucket *ctb, > } > > static enum ct_update_res > -conn_update(struct conn *conn, struct dp_packet *pkt, bool reply, > - long long now) > +conn_update(struct conn *conn, struct conntrack_bucket *ctb, > + struct dp_packet *pkt, bool reply, long long now) > { > - return l4_protos[conn->key.nw_proto]->conn_update(conn, pkt, reply, now); > + return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt, > + reply, now); > } > > static bool > @@ -870,11 +1029,12 @@ valid_new(struct dp_packet *pkt, struct conn_key *key) > } > > static struct conn * > -new_conn(struct dp_packet *pkt, struct conn_key *key, long long now) > +new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt, > + struct conn_key *key, long long now) > { > struct conn *newconn; > > - newconn = l4_protos[key->nw_proto]->new_conn(pkt, now); > + newconn = l4_protos[key->nw_proto]->new_conn(ctb, pkt, now); > > if (newconn) { > newconn->key = *key; > diff --git a/lib/conntrack.h b/lib/conntrack.h > index 54731bd..29bf4b7 100644 > --- a/lib/conntrack.h > +++ b/lib/conntrack.h > @@ -20,7 +20,9 @@ > #include <stdbool.h> > > #include "hmap.h" > +#include "latch.h" > #include "odp-netlink.h" > +#include "openvswitch/list.h" > #include "openvswitch/thread.h" > #include "openvswitch/types.h" > #include "ovs-atomic.h" > @@ -60,7 +62,6 @@ struct dp_packet_batch; > struct conntrack; > > void conntrack_init(struct conntrack *); > -void conntrack_run(struct conntrack *); > void conntrack_destroy(struct conntrack *); > > int conntrack_execute(struct conntrack *, struct dp_packet_batch *, > @@ -113,6 +114,10 @@ static inline void ct_lock_destroy(struct ct_lock *lock) > CT_TIMEOUT(OTHER_MULTIPLE, 60 * 1000) \ > CT_TIMEOUT(OTHER_BIDIR, 30 * 1000) \ > > +/* The smallest of the above values: it is used as an upper bound for the > + * interval between two rounds of cleanup of expired entries */ > +#define CT_TM_MIN (30 * 1000) Maybe #define it to the label of the minimum value above? > + > enum ct_timeout { > #define CT_TIMEOUT(NAME, VALUE) CT_TM_##NAME, > CT_TIMEOUTS > @@ -124,10 +129,29 @@ enum ct_timeout { > * > * The connections are kept in different buckets, which are completely > * independent. The connection bucket is determined by the hash of its key. > + * > + * Each bucket has two locks. Acquisition order is, from outermost to > + * innermost: > + * > + * lock > + * cleanup_mutex > + * > * */ > struct conntrack_bucket { > + /* Protects 'connections' and 'exp_lists'. Used in the fast path */ > struct ct_lock lock; > + /* Contains the connections in the bucket, indexed by key */ Indexed by 'struct conn_key'? > struct hmap connections OVS_GUARDED; > + /* For each possible timeout we have a list of connections. When the > + * timeout of a connection is updated, we move it to the back of the > list. > + * Since the connection in a list have the same relative timeout, the > list > + * will be ordered, with the oldest connections to the front. */ > + struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED; > + > + /* Protects 'next_cleanup'. Used to make sure that there's only one > thread > + * performing the cleanup. */ > + struct ovs_mutex cleanup_mutex; I guess cleanup_mutex is mostly to keep Clang threadsafety analysis happy, because the main thread may destroy conns during conntrack_destroy()? > + long long next_cleanup OVS_GUARDED; > }; > > #define CONNTRACK_BUCKETS_SHIFT 8 > @@ -140,6 +164,12 @@ struct conntrack { > /* Salt for hashing a connection key. */ > uint32_t hash_basis; > > + /* The thread performing periodic cleanup of the connection > + * tracker */ > + pthread_t clean_thread; > + /* Latch to destroy the 'clean_thread' */ > + struct latch clean_thread_exit; > + > /* Number of connections currently in the connection tracker. */ > atomic_count n_conn; > /* Connections limit. When this limit is reached, no new connection > -- > 2.8.1 > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev