Le 07/03/2018 à 09:58, Christopher Faulet a écrit :
I found thread-safety bugs about the management of pending connections.
It is totally fucked up :) It needs to be entirely reworked. I'm on it.
I hope to propose a patch this afternoon.
Hi,
Sorry for the lag. This issue was definitely harder to fix that I
thought so at first glance. Here is a proposal to fix queues management.
Could you check if it fixes your bug please ?
--
Christopher Faulet
>From 32926ada6a00c11980182d582a521d3833e36f23 Mon Sep 17 00:00:00 2001
From: Christopher Faulet <[email protected]>
Date: Wed, 14 Mar 2018 16:18:06 +0100
Subject: [PATCH] BUG/MAJOR: thread/queue: Fix thread-safety issues on the
queues management
The management of the servers and the proxies queues was not thread-safe at
all. First, the accesses to <strm>->pend_pos were not protected. So it was
possible to release it on a thread (for instance because the stream is released)
and to use it in same time on another one (because we redispatch pending
connections for a server). Then, the accesses to stream's information (flags and
target) from anywhere is forbidden. To be safe, The stream's state must always
be updated in the context of process_stream.
So to fix these issues, the queue module has been refactored. A lock has been
added in the pendconn structure. And now, when we try to dequeue a pending
connection, we start by unlinking it from the server/proxy queue and we wake up
the stream. Then, it is the stream reponsibility to really dequeue it (or
release it). This way, we are sure that only the stream can create and release
its <pend_pos> field.
However, be careful. This new implementation should be thread-safe
(hopefully...). But it is not optimal and in some situations, it could be really
slower in multi-threaded mode than in single-threaded one. The problem is that,
when we try to dequeue pending connections, we process it from the older one to
the newer one independently to the thread's affinity. So we need to wait the
other threads' wakeup to really process them. If threads are blocked in the
poller, this will add a significant latency. This problem happens when maxconn
values are very low.
This patch must be backported in 1.8.
---
include/common/hathreads.h | 2 +
include/proto/queue.h | 1 +
include/types/queue.h | 9 +-
include/types/stream.h | 2 +-
src/proto_http.c | 2 -
src/queue.c | 283 +++++++++++++++++++++++++++------------------
src/stream.c | 3 +-
7 files changed, 180 insertions(+), 122 deletions(-)
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 3da6dba4..19299db7 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -240,6 +240,7 @@ enum lock_label {
PIPES_LOCK,
START_LOCK,
TLSKEYS_REF_LOCK,
+ PENDCONN_LOCK,
LOCK_LABELS
};
struct lock_stat {
@@ -360,6 +361,7 @@ static inline const char *lock_label(enum lock_label label)
case PIPES_LOCK: return "PIPES";
case START_LOCK: return "START";
case TLSKEYS_REF_LOCK: return "TLSKEYS_REF";
+ case PENDCONN_LOCK: return "PENDCONN";
case LOCK_LABELS: break; /* keep compiler happy */
};
/* only way to come here is consecutive to an internal bug */
diff --git a/include/proto/queue.h b/include/proto/queue.h
index f66d809f..2d4773a0 100644
--- a/include/proto/queue.h
+++ b/include/proto/queue.h
@@ -38,6 +38,7 @@ extern struct pool_head *pool_head_pendconn;
int init_pendconn();
struct pendconn *pendconn_add(struct stream *strm);
+int pendconn_dequeue(struct stream *strm);
void pendconn_free(struct pendconn *p);
void process_srv_queue(struct server *s);
unsigned int srv_dynamic_maxconn(const struct server *s);
diff --git a/include/types/queue.h b/include/types/queue.h
index 4b354514..3e7d5a12 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -24,15 +24,18 @@
#include <common/config.h>
#include <common/mini-clist.h>
+#include <common/hathreads.h>
#include <types/server.h>
struct stream;
struct pendconn {
- struct list list; /* chaining ... */
- struct stream *strm; /* the stream waiting for a connection */
- struct server *srv; /* the server we are waiting for */
+ int strm_flags; /* stream flags */
+ struct stream *strm;
+ struct server *srv; /* the server we are waiting for, may be NULL */
+ struct list list; /* next pendconn */
+ __decl_hathreads(HA_SPINLOCK_T lock);
};
#endif /* _TYPES_QUEUE_H */
diff --git a/include/types/stream.h b/include/types/stream.h
index 227b0ffb..0dbc79f4 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -124,7 +124,7 @@ struct stream {
struct session *sess; /* the session this stream is attached to */
struct server *srv_conn; /* stream already has a slot on a server and is not in queue */
- struct pendconn *pend_pos; /* if not NULL, points to the position in the pending queue */
+ struct pendconn *pend_pos; /* if not NULL, points to the pending position in the pending queue */
struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */
diff --git a/src/proto_http.c b/src/proto_http.c
index 29880eaa..1a9adc1d 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -8251,8 +8251,6 @@ void http_reset_txn(struct stream *s)
s->store_count = 0;
s->uniq_id = global.req_count++;
- s->pend_pos = NULL;
-
s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */
/* We must trim any excess data from the response buffer, because we
diff --git a/src/queue.c b/src/queue.c
index 1dea7d53..34604661 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -24,7 +24,7 @@
struct pool_head *pool_head_pendconn;
-static void __pendconn_free(struct pendconn *p);
+static void pendconn_unlinked(struct pendconn *p);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_pendconn()
@@ -63,28 +63,7 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
return max;
}
-
-/* Returns the first pending connection for server <s>, which may be NULL if
- * nothing is pending.
- */
-static inline struct pendconn *pendconn_from_srv(const struct server *s) {
- if (!s->nbpend)
- return NULL;
- return LIST_ELEM(s->pendconns.n, struct pendconn *, list);
-}
-
-/* Returns the first pending connection for proxy <px>, which may be NULL if
- * nothing is pending.
- */
-static inline struct pendconn *pendconn_from_px(const struct proxy *px) {
- if (!px->nbpend)
- return NULL;
-
- return LIST_ELEM(px->pendconns.n, struct pendconn *, list);
-}
-
-
-/* Detaches the next pending connection from either a server or a proxy, and
+/* Process the next pending connection from either a server or a proxy, and
* returns its associated stream. If no pending connection is found, NULL is
* returned. Note that neither <srv> nor <px> may be NULL.
* Priority is given to the oldest request in the queue if both <srv> and <px>
@@ -97,44 +76,66 @@ static inline struct pendconn *pendconn_from_px(const struct proxy *px) {
* The stream is immediately marked as "assigned", and both its <srv> and
* <srv_conn> are set to <srv>,
*/
-static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *px)
+static struct stream *pendconn_process_next_strm(struct server *srv, struct proxy *px)
{
- struct pendconn *ps, *pp;
- struct stream *strm;
- struct server *rsrv;
+ struct pendconn *p = NULL;
+ struct server *rsrv;
rsrv = srv->track;
if (!rsrv)
rsrv = srv;
- ps = pendconn_from_srv(srv);
- pp = pendconn_from_px(px);
- /* we want to get the definitive pendconn in <ps> */
- if (!pp || !srv_currently_usable(rsrv)) {
- if (!ps)
- return NULL;
- } else {
- /* pendconn exists in the proxy queue */
- if (!ps || tv_islt(&pp->strm->logs.tv_request, &ps->strm->logs.tv_request))
- ps = pp;
+ if (srv->nbpend) {
+ list_for_each_entry(p, &srv->pendconns, list) {
+ if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+ goto ps_found;
+ }
+ p = NULL;
+ }
+
+ ps_found:
+ if (srv_currently_usable(rsrv) && px->nbpend) {
+ struct pendconn *pp;
+
+ list_for_each_entry(pp, &px->pendconns, list) {
+ /* If the server pendconn is older than the proxy one,
+ * we process the server one. */
+ if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request))
+ goto pendconn_found;
+
+ if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
+ /* Let's switch from the server pendconn to the
+ * proxy pendconn. Don't forget to unlock the
+ * server pendconn, if any. */
+ if (p)
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+ p = pp;
+ goto pendconn_found;
+ }
+ }
}
- strm = ps->strm;
- __pendconn_free(ps);
- /* we want to note that the stream has now been assigned a server */
- strm->flags |= SF_ASSIGNED;
- strm->target = &srv->obj_type;
- __stream_add_srv_conn(strm, srv);
+ if (!p)
+ return NULL;
+
+ pendconn_found:
+ pendconn_unlinked(p);
+ p->strm_flags = (p->strm_flags & (SF_DIRECT | SF_ADDR_SET));
+ p->strm_flags |= SF_ASSIGNED;
+ p->srv = srv;
+
HA_ATOMIC_ADD(&srv->served, 1);
HA_ATOMIC_ADD(&srv->proxy->served, 1);
if (px->lbprm.server_take_conn)
px->lbprm.server_take_conn(srv);
+ __stream_add_srv_conn(p->strm, srv);
- return strm;
+ task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+ return p->strm;
}
-/*
- * Manages a server's connection queue. This function will try to dequeue as
+/* Manages a server's connection queue. This function will try to dequeue as
* many pending streams as possible, and wake them up.
*/
void process_srv_queue(struct server *s)
@@ -144,17 +145,10 @@ void process_srv_queue(struct server *s)
HA_SPIN_LOCK(PROXY_LOCK, &p->lock);
HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-
- /* First, check if we can handle some connections queued at the proxy. We
- * will take as many as we can handle.
- */
maxconn = srv_dynamic_maxconn(s);
while (s->served < maxconn) {
- struct stream *strm = pendconn_get_next_strm(s, p);
-
- if (strm == NULL)
+ if (!pendconn_process_next_strm(s, p))
break;
- task_wakeup(strm->task, TASK_WOKEN_RES);
}
HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
HA_SPIN_UNLOCK(PROXY_LOCK, &p->lock);
@@ -169,35 +163,39 @@ void process_srv_queue(struct server *s)
struct pendconn *pendconn_add(struct stream *strm)
{
struct pendconn *p;
- struct server *srv;
- int count;
+ struct server *srv;
p = pool_alloc(pool_head_pendconn);
if (!p)
return NULL;
- strm->pend_pos = p;
- p->strm = strm;
- srv = objt_server(strm->target);
+ p->srv = NULL;
+ p->strm = strm;
+ p->strm_flags = strm->flags;
+ HA_SPIN_INIT(&p->lock);
+ srv = objt_server(strm->target);
if ((strm->flags & SF_ASSIGNED) && srv) {
p->srv = srv;
HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
+ srv->nbpend++;
+ strm->logs.srv_queue_size += srv->nbpend;
+ if (srv->nbpend > srv->counters.nbpend_max)
+ srv->counters.nbpend_max = srv->nbpend;
LIST_ADDQ(&srv->pendconns, &p->list);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
- count = HA_ATOMIC_ADD(&srv->nbpend, 1);
- strm->logs.srv_queue_size += count;
- HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
- } else {
- p->srv = NULL;
+ }
+ else {
HA_SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
+ strm->be->nbpend++;
+ strm->logs.prx_queue_size += strm->be->nbpend;
+ if (strm->be->nbpend > strm->be->be_counters.nbpend_max)
+ strm->be->be_counters.nbpend_max = strm->be->nbpend;
LIST_ADDQ(&strm->be->pendconns, &p->list);
HA_SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
- count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
- strm->logs.prx_queue_size += count;
- HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
}
HA_ATOMIC_ADD(&strm->be->totpend, 1);
+ strm->pend_pos = p;
return p;
}
@@ -206,26 +204,28 @@ struct pendconn *pendconn_add(struct stream *strm)
*/
int pendconn_redistribute(struct server *s)
{
- struct pendconn *pc, *pc_bck;
+ struct pendconn *p, *pback;
int xferred = 0;
+ /* The REDISP option was specified. We will ignore cookie and force to
+ * balance or use the dispatcher. */
+ if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
+ return 0;
+
HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
- list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
- struct stream *strm = pc->strm;
+ list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+ if (p->strm_flags & SF_FORCE_PRST)
+ continue;
- if ((strm->be->options & (PR_O_REDISP|PR_O_PERSIST)) == PR_O_REDISP &&
- !(strm->flags & SF_FORCE_PRST)) {
- /* The REDISP option was specified. We will ignore
- * cookie and force to balance or use the dispatcher.
- */
+ if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+ continue;
- /* it's left to the dispatcher to choose a server */
- strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+ /* it's left to the dispatcher to choose a server */
+ pendconn_unlinked(p);
+ p->strm_flags = 0;
- __pendconn_free(pc);
- task_wakeup(strm->task, TASK_WOKEN_RES);
- xferred++;
- }
+ task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
}
HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
return xferred;
@@ -238,66 +238,121 @@ int pendconn_redistribute(struct server *s)
*/
int pendconn_grab_from_px(struct server *s)
{
- int xferred;
+ struct pendconn *p, *pback;
+ int maxconn, xferred = 0;
if (!srv_currently_usable(s))
return 0;
HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
- for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
- struct stream *strm;
- struct pendconn *p;
-
- p = pendconn_from_px(s->proxy);
- if (!p)
+ maxconn = srv_dynamic_maxconn(s);
+ list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+ if (s->maxconn && s->served + xferred >= maxconn)
break;
- p->strm->target = &s->obj_type;
- strm = p->strm;
- __pendconn_free(p);
- task_wakeup(strm->task, TASK_WOKEN_RES);
+
+ if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+ continue;
+
+ pendconn_unlinked(p);
+ p->strm_flags = (p->strm_flags & (SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET));
+ p->srv = s;
+
+ task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+ xferred++;
}
HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
return xferred;
}
-/*
- * Detaches pending connection <p>, decreases the pending count, and frees
- * the pending connection. The connection might have been queued to a specific
- * server as well as to the proxy. The stream also gets marked unqueued.
+/* Try to dequeue pending connection attached to the stream <strm>. It must
+ * always exists here. If the pendconn is still linked to the server or the
+ * proxy queue, nothing is done and the function returns 1. Otherwise,
+ * <strm>->flags and <strm>->target are updated, the pendconn is released and 0
+ * is returned.
+ */
+int pendconn_dequeue(struct stream *strm)
+{
+ struct pendconn *p;
+
+ if (unlikely(!strm->pend_pos)) {
+ /* unexpected case because it is called by the stream itself and
+ * only the stream can release a pendconn. So it is only
+ * possible if a pendconn is released by someone else or if the
+ * stream is supposed to be queued but without its associated
+ * pendconn. In both cases it is a bug! */
+ abort();
+ }
+ p = strm->pend_pos;
+ HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+ /* the pendconn is still linked to the server/proxy queue, so unlock it
+ * and go away. */
+ if (!LIST_ISEMPTY(&p->list)) {
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+ return 1;
+ }
+
+ /* the pendconn must be dequeued now */
+ if (p->srv)
+ strm->target = &p->srv->obj_type;
+
+ strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+ strm->flags |= p->strm_flags;
+ strm->pend_pos = NULL;
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+ pool_free(pool_head_pendconn, p);
+ return 0;
+}
+
+/* Release the pending connection <p>, and decreases the pending count if
+ * needed. The connection might have been queued to a specific server as well as
+ * to the proxy. The stream also gets marked unqueued.
*/
void pendconn_free(struct pendconn *p)
{
+ struct stream *strm = p->strm;
+
+ HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+ /* The pendconn was already unlinked, just release it. */
+ if (LIST_ISEMPTY(&p->list))
+ goto release;
+
if (p->srv) {
HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+ p->srv->nbpend--;
LIST_DEL(&p->list);
HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
- HA_ATOMIC_SUB(&p->srv->nbpend, 1);
}
else {
- HA_SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
+ HA_SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
+ strm->be->nbpend--;
LIST_DEL(&p->list);
- HA_SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
- HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+ HA_SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
}
- p->strm->pend_pos = NULL;
- HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+ HA_ATOMIC_SUB(&strm->be->totpend, 1);
+
+ release:
+ strm->pend_pos = NULL;
+ HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
pool_free(pool_head_pendconn, p);
}
-/* Lock-free version of pendconn_free. */
-static void __pendconn_free(struct pendconn *p)
+/* Remove the pendconn from the server/proxy queue. At this stage, the
+ * connection is not really dequeued. It will be done during the
+ * process_stream. This function must be called by function owning the locks on
+ * the pendconn _AND_ the server/proxy. It also decreases the pending count.
+ */
+static void pendconn_unlinked(struct pendconn *p)
{
- if (p->srv) {
- LIST_DEL(&p->list);
- HA_ATOMIC_SUB(&p->srv->nbpend, 1);
- }
- else {
- LIST_DEL(&p->list);
- HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
- }
- p->strm->pend_pos = NULL;
+ if (p->srv)
+ p->srv->nbpend--;
+ else
+ p->strm->be->nbpend--;
HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
- pool_free(pool_head_pendconn, p);
+ LIST_DEL(&p->list);
+ LIST_INIT(&p->list);
}
/*
diff --git a/src/stream.c b/src/stream.c
index 92f9c0a6..7c42b83c 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -400,7 +400,6 @@ static void stream_free(struct stream *s)
if (must_free_sess)
session_free(sess);
-
pool_free(pool_head_stream, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */
@@ -929,7 +928,7 @@ static void sess_update_stream_int(struct stream *s)
}
else if (si->state == SI_ST_QUE) {
/* connection request was queued, check for any update */
- if (!s->pend_pos) {
+ if (!pendconn_dequeue(s)) {
/* The connection is not in the queue anymore. Either
* we have a server connection slot available and we
* go directly to the assigned state, or we need to
--
2.14.3