On 24 June 2016 at 16:42, Daniele Di Proietto <diproiet...@vmware.com> wrote: > Thanks for your comments Joe, replies inline > > > > On 24/06/2016 15:29, "Joe Stringer" <j...@ovn.org> wrote: > >>On 10 June 2016 at 15:47, Daniele Di Proietto <diproiet...@vmware.com> wrote: >>> This commit adds a thread that periodically removes expired connections. >>> >>> The expiration time of a connection can be expressed by: >>> >>> expiration = now + timeout >>> >>> For each possible 'timeout' value (there aren't many) we keep a list. >>> When the expiration is updated, we move the connection to the back of the >>> corresponding 'timeout' list. This ways, the list is always ordered by >>> 'expiration'. >>> >>> When the cleanup thread iterates through the lists for expired >>> connections, it can stop at the first non expired connection. >>> >>> Suggested-by: Joe Stringer <j...@ovn.org> >>> Signed-off-by: Daniele Di Proietto <diproiet...@vmware.com> >>> --- >>> lib/conntrack-other.c | 11 +-- >>> lib/conntrack-private.h | 21 ++++-- >>> lib/conntrack-tcp.c | 20 +++--- >>> lib/conntrack.c | 184 >>> ++++++++++++++++++++++++++++++++++++++++++++---- >>> lib/conntrack.h | 32 ++++++++- >>> 5 files changed, 237 insertions(+), 31 deletions(-) >>> >>> diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c >>> index 295cb2c..2920889 100644 >>> --- a/lib/conntrack-other.c >>> +++ b/lib/conntrack-other.c >>> @@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn) >>> } >>> >>> static enum ct_update_res >>> -other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED, >>> - bool reply, long long now) >>> +other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, >>> + struct dp_packet *pkt OVS_UNUSED, bool reply, long long >>> now) >>> { >>> struct conn_other *conn = conn_other_cast(conn_); >>> >>> @@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet >>> *pkt OVS_UNUSED, >>> conn->state = OTHERS_MULTIPLE; >>> } >>> >>> - update_expiration(conn_, other_timeouts[conn->state], now); >>> + conn_update_expiration(ctb, &conn->up, othler_timeouts[conn->state], >>> now); >>> >>> return CT_UPDATE_VALID; >>> } >>> @@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED) >>> } >>> >>> static struct conn * >>> -other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now) >>> +other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt >>> OVS_UNUSED, >>> + long long now) >>> { >>> struct conn_other *conn; >>> >>> conn = xzalloc(sizeof *conn); >>> conn->state = OTHERS_FIRST; >>> >>> - update_expiration(&conn->up, other_timeouts[conn->state], now); >>> + conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now); >>> >>> return &conn->up; >>> } >>> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h >>> index d3e0099..4743dc6 100644 >>> --- a/lib/conntrack-private.h >>> +++ b/lib/conntrack-private.h >>> @@ -68,10 +68,13 @@ enum ct_update_res { >>> }; >>> >>> struct ct_l4_proto { >>> - struct conn *(*new_conn)(struct dp_packet *pkt, long long now); >>> + struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet >>> *pkt, >>> + long long now); >>> bool (*valid_new)(struct dp_packet *pkt); >>> - enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet >>> *pkt, >>> - bool reply, long long now); >>> + enum ct_update_res (*conn_update)(struct conn *conn, >>> + struct conntrack_bucket *, >>> + struct dp_packet *pkt, bool reply, >>> + long long now); >>> }; >>> >>> extern struct ct_l4_proto ct_proto_tcp; >>> @@ -80,9 +83,19 @@ extern struct ct_l4_proto ct_proto_other; >>> extern long long ct_timeout_val[]; >>> >>> static inline void >>> -update_expiration(struct conn *conn, enum ct_timeout tm, long long now) >>> +conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn, >>> + enum ct_timeout tm, long long now) >>> { >>> conn->expiration = now + ct_timeout_val[tm]; >>> + ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node); >>> +} >>> + >>> +static inline void >>> +conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn, >>> + enum ct_timeout tm, long long now) >>> +{ >>> + ovs_list_remove(&conn->exp_node); >>> + conn_init_expiration(ctb, conn, tm, now); >>> } >>> >>> #endif /* conntrack-private.h */ >>> diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c >>> index b574eeb..71eadc1 100644 >>> --- a/lib/conntrack-tcp.c >>> +++ b/lib/conntrack-tcp.c >>> @@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt) >>> } >>> >>> static enum ct_update_res >>> -tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply, >>> - long long now) >>> +tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, >>> + struct dp_packet *pkt, bool reply, long long now) >>> { >>> struct conn_tcp *conn = conn_tcp_cast(conn_); >>> struct tcp_header *tcp = dp_packet_l4(pkt); >>> @@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet >>> *pkt, bool reply, >>> >>> if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2 >>> && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) { >>> - update_expiration(conn_, CT_TM_TCP_CLOSED, now); >>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now); >>> } else if (src->state >= CT_DPIF_TCPS_CLOSING >>> && dst->state >= CT_DPIF_TCPS_CLOSING) { >>> - update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now); >>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, >>> now); >>> } else if (src->state < CT_DPIF_TCPS_ESTABLISHED >>> || dst->state < CT_DPIF_TCPS_ESTABLISHED) { >>> - update_expiration(conn_, now, CT_TM_TCP_OPENING); >>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now); >>> } else if (src->state >= CT_DPIF_TCPS_CLOSING >>> || dst->state >= CT_DPIF_TCPS_CLOSING) { >>> - update_expiration(conn_, now, CT_TM_TCP_CLOSING); >>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now); >>> } else { >>> - update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED); >>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, >>> now); >>> } >>> } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT >>> || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2 >>> @@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt) >>> } >>> >>> static struct conn * >>> -tcp_new_conn(struct dp_packet *pkt, long long now) >>> +tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt, >>> + long long now) >>> { >>> struct conn_tcp* newconn = NULL; >>> struct tcp_header *tcp = dp_packet_l4(pkt); >>> @@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now) >>> src->state = CT_DPIF_TCPS_SYN_SENT; >>> dst->state = CT_DPIF_TCPS_CLOSED; >>> >>> - update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET); >>> + conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET, >>> + now); >>> >>> return &newconn->up; >>> } >>> diff --git a/lib/conntrack.c b/lib/conntrack.c >>> index 96935bc..5376550 100644 >>> --- a/lib/conntrack.c >>> +++ b/lib/conntrack.c >>> @@ -33,6 +33,8 @@ >>> #include "odp-netlink.h" >>> #include "openvswitch/vlog.h" >>> #include "ovs-rcu.h" >>> +#include "ovs-thread.h" >>> +#include "poll-loop.h" >>> #include "random.h" >>> #include "timeval.h" >>> >>> @@ -56,17 +58,20 @@ static void conn_key_lookup(struct conntrack_bucket >>> *ctb, >>> struct conn_lookup_ctx *ctx, >>> long long now); >>> static bool valid_new(struct dp_packet *pkt, struct conn_key *); >>> -static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *, >>> - long long now); >>> +static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet >>> *pkt, >>> + struct conn_key *, long long now); >>> static void delete_conn(struct conn *); >>> -static enum ct_update_res conn_update(struct conn *, struct dp_packet*, >>> - bool reply, long long now); >>> +static enum ct_update_res conn_update(struct conn *, >>> + struct conntrack_bucket *ctb, >>> + struct dp_packet *, bool reply, >>> + long long now); >>> static bool conn_expired(struct conn *, long long now); >>> static void set_mark(struct dp_packet *, struct conn *, >>> uint32_t val, uint32_t mask); >>> static void set_label(struct dp_packet *, struct conn *, >>> const struct ovs_key_ct_labels *val, >>> const struct ovs_key_ct_labels *mask); >>> +static void *clean_thread_main(void *f_); >>> >>> static struct ct_l4_proto *l4_protos[] = { >>> [IPPROTO_TCP] = &ct_proto_tcp, >>> @@ -90,7 +95,8 @@ long long ct_timeout_val[] = { >>> void >>> conntrack_init(struct conntrack *ct) >>> { >>> - unsigned i; >>> + unsigned i, j; >>> + long long now = time_msec(); >>> >>> for (i = 0; i < CONNTRACK_BUCKETS; i++) { >>> struct conntrack_bucket *ctb = &ct->buckets[i]; >>> @@ -98,11 +104,20 @@ conntrack_init(struct conntrack *ct) >>> ct_lock_init(&ctb->lock); >>> ct_lock_lock(&ctb->lock); >>> hmap_init(&ctb->connections); >>> + for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) { >>> + ovs_list_init(&ctb->exp_lists[j]); >>> + } >>> ct_lock_unlock(&ctb->lock); >>> + ovs_mutex_init(&ctb->cleanup_mutex); >>> + ovs_mutex_lock(&ctb->cleanup_mutex); >>> + ctb->next_cleanup = now + CT_TM_MIN; >>> + ovs_mutex_unlock(&ctb->cleanup_mutex); >>> } >>> ct->hash_basis = random_uint32(); >>> atomic_count_init(&ct->n_conn, 0); >>> atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT); >>> + latch_init(&ct->clean_thread_exit); >>> + ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, >>> ct); >>> } >>> >>> /* Destroys the connection tracker 'ct' and frees all the allocated >>> memory. */ >>> @@ -111,10 +126,14 @@ conntrack_destroy(struct conntrack *ct) >>> { >>> unsigned i; >>> >>> + latch_set(&ct->clean_thread_exit); >>> + pthread_join(ct->clean_thread, NULL); >>> + latch_destroy(&ct->clean_thread_exit); >>> for (i = 0; i < CONNTRACK_BUCKETS; i++) { >>> struct conntrack_bucket *ctb = &ct->buckets[i]; >>> struct conn *conn; >>> >>> + ovs_mutex_destroy(&ctb->cleanup_mutex); >>> ct_lock_lock(&ctb->lock); >> >>I'm confused, is the order to grab cleanup_mutex then ctb->lock, or >>the other way around? (See also conntrack_clean() if this is the wrong >>order) > > Well, this is not grabbing the mutex, just destroying it. Does it need to > be in a particular order? I chose this order because it's the reverse of > what conntrack_init() does. > > There's only one place where 'lock' and 'cleanup_mutex' nest, and this is > conntrack_clean(). > > I've actually tried to documented the acquisition order in conntrack.h, but > it's inverted. Sorry about this
Ah, okay. That's the source of my confusion. > I'll fix the acquisition order in conntrack.h, agreed? That sounds fine. >> >>> HMAP_FOR_EACH_POP(conn, node, &ctb->connections) { >>> atomic_count_dec(&ct->n_conn); >>> @@ -170,7 +189,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet >>> *pkt, >>> return nc; >>> } >>> >>> - nc = new_conn(pkt, &ctx->key, now); >>> + nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now); >>> >>> memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key); >>> >>> @@ -200,7 +219,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, >>> } else { >>> enum ct_update_res res; >>> >>> - res = conn_update(conn, pkt, ctx->reply, now); >>> + res = conn_update(conn, &ct->buckets[bucket], pkt, >>> + ctx->reply, now); >>> >>> switch (res) { >>> case CT_UPDATE_VALID: >>> @@ -213,6 +233,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, >>> state |= CS_INVALID; >>> break; >>> case CT_UPDATE_NEW: >>> + ovs_list_remove(&conn->exp_node); >>> hmap_remove(&ct->buckets[bucket].connections, &conn->node); >>> atomic_count_dec(&ct->n_conn); >>> delete_conn(conn); >>> @@ -345,6 +366,143 @@ set_label(struct dp_packet *pkt, struct conn *conn, >>> conn->label = pkt->md.ct_label; >>> } >>> >>> +/* Delete the expired connections from 'ctb', up to 'limit'. Returns the >>> + * earliest expiration time among the remaining connections in 'ctb'. >>> Returns >>> + * LLONG_MAX if 'ctb' is empty. The return value might be smaller than >>> 'now', >>> + * if 'limit' is reached */ >>> +static long long >>> +sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long >>> now, >>> + size_t limit) >>> + OVS_REQUIRES(ctb->lock) >>> +{ >>> + struct conn *conn, *next; >>> + long long min_expiration = LLONG_MAX; >>> + unsigned i; >>> + size_t count = 0; >>> + >>> + for (i = 0; i < N_CT_TM; i++) { >>> + LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) { >>> + if (!conn_expired(conn, now) || count >= limit) { >>> + min_expiration = MIN(min_expiration, conn->expiration); >>> + if (count >= limit) { >>> + /* Do not check other lists. */ >>> + return min_expiration; >> >>Do you think we should have a coverage counter for this? > > Good idea, I'll add that, thanks. > >> >>> + } >>> + break; >>> + } >>> + ovs_list_remove(&conn->exp_node); >>> + hmap_remove(&ctb->connections, &conn->node); >>> + atomic_count_dec(&ct->n_conn); >>> + delete_conn(conn); >>> + count++; >>> + } >>> + } >>> + >>> + return min_expiration; >>> +} >>> + >>> +/* Cleans up old connection entries from 'ct'. Returns the time when the >>> + * next expiration might happen. The return value might be smaller than >>> + * 'now', meaning that an internal limit has been reached, and some expired >>> + * connections have not been deleted. */ >>> +static long long >>> +conntrack_clean(struct conntrack *ct, long long now) >>> +{ >>> + long long next_wakeup = now + CT_TM_MIN; >>> + unsigned int n_conn_limit; >>> + size_t clean_count = 0; >>> + unsigned i; >>> + >>> + atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit); >>> + >>> + for (i = 0; i < CONNTRACK_BUCKETS; i++) { >>> + struct conntrack_bucket *ctb = &ct->buckets[i]; >>> + size_t prev_count; >>> + long long min_exp; >>> + >>> + ovs_mutex_lock(&ctb->cleanup_mutex); >>> >>> + if (ctb->next_cleanup > now) { >>> + goto next_bucket; >>> + } >>> + >>> + ct_lock_lock(&ctb->lock); >>> + prev_count = hmap_count(&ctb->connections); >>> + /* If the connections are well distributed among buckets, we want >>> to >>> + * limit to 10% of the global limit equally split among buckets. If >>> + * the bucket is busier than the others, we limit to 10% of its >>> + * current size. */ >>> + min_exp = sweep_bucket(ct, ctb, now, >>> + MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10))); >>> + clean_count += prev_count - hmap_count(&ctb->connections); >>> + >>> + if (min_exp > now) { >>> + /* We call hmap_shrink() only if sweep_bucket() managed to >>> delete >>> + * every expired connection. */ >>> + hmap_shrink(&ctb->connections); >>> + } >>> + >>> + ct_lock_unlock(&ctb->lock); >>> + >>> + ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN); >>> + >>> +next_bucket: >>> + next_wakeup = MIN(next_wakeup, ctb->next_cleanup); >>> + ovs_mutex_unlock(&ctb->cleanup_mutex); >>> + } >>> + >>> + VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", >>> + clean_count, time_msec() - now); >>> + >>> + return next_wakeup; >>> +} >>> + >>> +/* Cleanup: >>> + * >>> + * >>> + * We must call conntrack_clean() periodically. conntrack_clean() return >>> + * value gives an hint on when the next cleanup must be done (either >>> because >>> + * there is an actual connection that expires, or because a new connection >>> + * might be created with the minimum timeout). >>> + * >>> + * The logic below has two goals: >>> + * >>> + * - Avoid calling conntrack_clean() too often. If we call >>> conntrack_clean() >>> + * each time a connection expires, the thread will consume 100% CPU, so >>> we >>> + * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to >>> batch >>> + * removal. >>> + * >>> + * - On the other hand, it's not a good idea to keep the buckets locked for >>> + * too long, as we might prevent traffic from flowing. If >>> conntrack_clean() >>> + * returns a value which is in the past, it means that the internal limit >>> + * has been reached and more cleanup is required. In this case, just >>> wait >>> + * CT_CLEAN_MIN_INTERVAL before the next call. >>> + */ >>> +#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */ >>> +#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */ >>> + >>> +static void * >>> +clean_thread_main(void *f_) >>> +{ >>> + struct conntrack *ct = f_; >>> + >>> + while (!latch_is_set(&ct->clean_thread_exit)) { >>> + long long next_wake; >>> + long long now = time_msec(); >>> + >>> + next_wake = conntrack_clean(ct, now); >>> + >>> + if (next_wake < now) { >>> + poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL); >>> + } else { >>> + poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL)); >>> + } >>> + latch_wait(&ct->clean_thread_exit); >>> + poll_block(); >>> + } >> >>Are the logs going to constantly complain that this thread sleeps for >>more than a second? > > poll_timer_wait_until() arranges for the next poll_block to return after the > interval. The thread will be sleeping inside the poll_block, there should > be no warning (unless conntrack_clean takes more than 1s, but in that case > a warning is appropriate). Ah, I was mistaken about when the "last_wakeup" was taken. OK. >>> struct hmap connections OVS_GUARDED; >>> + /* For each possible timeout we have a list of connections. When the >>> + * timeout of a connection is updated, we move it to the back of the >>> list. >>> + * Since the connection in a list have the same relative timeout, the >>> list >>> + * will be ordered, with the oldest connections to the front. */ >>> + struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED; >>> + >>> + /* Protects 'next_cleanup'. Used to make sure that there's only one >>> thread >>> + * performing the cleanup. */ >>> + struct ovs_mutex cleanup_mutex; >> >>I guess cleanup_mutex is mostly to keep Clang threadsafety analysis >>happy, because the main thread may destroy conns during >>conntrack_destroy()? > > Yes, that mutex is acquired by the cleanup thread only (except for the main > thread > during init and destroy). It's pretty much useless, except that I find it > better > to have a mutex, rather than documenting that "'next_cleanup' should only be > accessed > by the cleanup thread". > > I can remove it if you prefer No, that's fine as-is. Sometimes I put a comment above things like that to say "/* Appease clang threadsafety analyser */". _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev