Author: mav
Date: Sun Jun  8 11:19:32 2014
New Revision: 267228
URL: http://svnweb.freebsd.org/changeset/base/267228

Log:
  Split RPC pool threads into number of smaller semi-isolated groups.
  
  Old design with unified thread pool was good from the point of thread
  utilization.  But single pool-wide mutex became huge congestion point
  for systems with many CPUs.  To reduce the congestion create several
  thread groups within a pool (one group for every 6 CPUs and 12 threads),
  each group with own mutex.  Each connection during its registration is
  assigned to one of the groups in round-robin fashion.  File affinify
  code may still move requests between the groups, but otherwise groups
  are self-contained.
  
  MFC after:    2 weeks
  Sponsored by: iXsystems, Inc.

Modified:
  head/sys/rpc/svc.c
  head/sys/rpc/svc.h
  head/sys/rpc/svc_generic.c

Modified: head/sys/rpc/svc.c
==============================================================================
--- head/sys/rpc/svc.c  Sun Jun  8 10:56:25 2014        (r267227)
+++ head/sys/rpc/svc.c  Sun Jun  8 11:19:32 2014        (r267228)
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/queue.h>
 #include <sys/socketvar.h>
 #include <sys/systm.h>
+#include <sys/smp.h>
 #include <sys/sx.h>
 #include <sys/ucred.h>
 
@@ -70,7 +71,7 @@ __FBSDID("$FreeBSD$");
 
 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
     char *);
-static void svc_new_thread(SVCPOOL *pool);
+static void svc_new_thread(SVCGROUP *grp);
 static void xprt_unregister_locked(SVCXPRT *xprt);
 static void svc_change_space_used(SVCPOOL *pool, int delta);
 static bool_t svc_request_space_available(SVCPOOL *pool);
@@ -79,11 +80,14 @@ static bool_t svc_request_space_availabl
 
 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
+static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
 
 SVCPOOL*
 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
 {
        SVCPOOL *pool;
+       SVCGROUP *grp;
+       int g;
 
        pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
        
@@ -91,15 +95,22 @@ svcpool_create(const char *name, struct 
        pool->sp_name = name;
        pool->sp_state = SVCPOOL_INIT;
        pool->sp_proc = NULL;
-       TAILQ_INIT(&pool->sp_xlist);
-       TAILQ_INIT(&pool->sp_active);
        TAILQ_INIT(&pool->sp_callouts);
        TAILQ_INIT(&pool->sp_lcallouts);
-       LIST_INIT(&pool->sp_threads);
-       LIST_INIT(&pool->sp_idlethreads);
        pool->sp_minthreads = 1;
        pool->sp_maxthreads = 1;
-       pool->sp_threadcount = 0;
+       pool->sp_groupcount = 1;
+       for (g = 0; g < SVC_MAXGROUPS; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
+               grp->sg_pool = pool;
+               grp->sg_state = SVCPOOL_ACTIVE;
+               TAILQ_INIT(&grp->sg_xlist);
+               TAILQ_INIT(&grp->sg_active);
+               LIST_INIT(&grp->sg_idlethreads);
+               grp->sg_minthreads = 1;
+               grp->sg_maxthreads = 1;
+       }
 
        /*
         * Don't use more than a quarter of mbuf clusters or more than
@@ -114,12 +125,19 @@ svcpool_create(const char *name, struct 
        if (sysctl_base) {
                SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
                    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
-                   pool, 0, svcpool_minthread_sysctl, "I", "");
+                   pool, 0, svcpool_minthread_sysctl, "I",
+                   "Minimal number of threads");
                SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
                    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
-                   pool, 0, svcpool_maxthread_sysctl, "I", "");
+                   pool, 0, svcpool_maxthread_sysctl, "I",
+                   "Maximal number of threads");
+               SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+                   "threads", CTLTYPE_INT | CTLFLAG_RD,
+                   pool, 0, svcpool_threads_sysctl, "I",
+                   "Current number of threads");
                SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
-                   "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
+                   "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
+                   "Number of thread groups");
 
                SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
                    "request_space_used", CTLFLAG_RD,
@@ -158,20 +176,29 @@ svcpool_create(const char *name, struct 
 void
 svcpool_destroy(SVCPOOL *pool)
 {
+       SVCGROUP *grp;
        SVCXPRT *xprt, *nxprt;
        struct svc_callout *s;
        struct svc_loss_callout *sl;
        struct svcxprt_list cleanup;
+       int g;
 
        TAILQ_INIT(&cleanup);
-       mtx_lock(&pool->sp_lock);
 
-       while (TAILQ_FIRST(&pool->sp_xlist)) {
-               xprt = TAILQ_FIRST(&pool->sp_xlist);
-               xprt_unregister_locked(xprt);
-               TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+       for (g = 0; g < SVC_MAXGROUPS; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_lock(&grp->sg_lock);
+               while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
+                       xprt_unregister_locked(xprt);
+                       TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+               }
+               mtx_unlock(&grp->sg_lock);
+       }
+       TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
+               SVC_RELEASE(xprt);
        }
 
+       mtx_lock(&pool->sp_lock);
        while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
                mtx_unlock(&pool->sp_lock);
                svc_unreg(pool, s->sc_prog, s->sc_vers);
@@ -184,10 +211,10 @@ svcpool_destroy(SVCPOOL *pool)
        }
        mtx_unlock(&pool->sp_lock);
 
-       TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
-               SVC_RELEASE(xprt);
+       for (g = 0; g < SVC_MAXGROUPS; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_destroy(&grp->sg_lock);
        }
-
        mtx_destroy(&pool->sp_lock);
 
        if (pool->sp_rcache)
@@ -197,14 +224,23 @@ svcpool_destroy(SVCPOOL *pool)
        free(pool, M_RPC);
 }
 
-static bool_t
-svcpool_active(SVCPOOL *pool)
+/*
+ * Sysctl handler to get the present thread count on a pool
+ */
+static int
+svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
 {
-       enum svcpool_state state = pool->sp_state;
+       SVCPOOL *pool;
+       int threads, error, g;
 
-       if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
-               return (FALSE);
-       return (TRUE);
+       pool = oidp->oid_arg1;
+       threads = 0;
+       mtx_lock(&pool->sp_lock);
+       for (g = 0; g < pool->sp_groupcount; g++)
+               threads += pool->sp_groups[g].sg_threadcount;
+       mtx_unlock(&pool->sp_lock);
+       error = sysctl_handle_int(oidp, &threads, 0, req);
+       return (error);
 }
 
 /*
@@ -214,7 +250,7 @@ static int
 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
 {
        SVCPOOL *pool;
-       int newminthreads, error, n;
+       int newminthreads, error, g;
 
        pool = oidp->oid_arg1;
        newminthreads = pool->sp_minthreads;
@@ -223,21 +259,11 @@ svcpool_minthread_sysctl(SYSCTL_HANDLER_
                if (newminthreads > pool->sp_maxthreads)
                        return (EINVAL);
                mtx_lock(&pool->sp_lock);
-               if (newminthreads > pool->sp_minthreads
-                   && svcpool_active(pool)) {
-                       /*
-                        * If the pool is running and we are
-                        * increasing, create some more threads now.
-                        */
-                       n = newminthreads - pool->sp_threadcount;
-                       if (n > 0) {
-                               mtx_unlock(&pool->sp_lock);
-                               while (n--)
-                                       svc_new_thread(pool);
-                               mtx_lock(&pool->sp_lock);
-                       }
-               }
                pool->sp_minthreads = newminthreads;
+               for (g = 0; g < pool->sp_groupcount; g++) {
+                       pool->sp_groups[g].sg_minthreads = max(1,
+                           pool->sp_minthreads / pool->sp_groupcount);
+               }
                mtx_unlock(&pool->sp_lock);
        }
        return (error);
@@ -250,8 +276,7 @@ static int
 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
 {
        SVCPOOL *pool;
-       SVCTHREAD *st;
-       int newmaxthreads, error;
+       int newmaxthreads, error, g;
 
        pool = oidp->oid_arg1;
        newmaxthreads = pool->sp_maxthreads;
@@ -260,17 +285,11 @@ svcpool_maxthread_sysctl(SYSCTL_HANDLER_
                if (newmaxthreads < pool->sp_minthreads)
                        return (EINVAL);
                mtx_lock(&pool->sp_lock);
-               if (newmaxthreads < pool->sp_maxthreads
-                   && svcpool_active(pool)) {
-                       /*
-                        * If the pool is running and we are
-                        * decreasing, wake up some idle threads to
-                        * encourage them to exit.
-                        */
-                       LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
-                               cv_signal(&st->st_cond);
-               }
                pool->sp_maxthreads = newmaxthreads;
+               for (g = 0; g < pool->sp_groupcount; g++) {
+                       pool->sp_groups[g].sg_maxthreads = max(1,
+                           pool->sp_maxthreads / pool->sp_groupcount);
+               }
                mtx_unlock(&pool->sp_lock);
        }
        return (error);
@@ -283,13 +302,17 @@ void
 xprt_register(SVCXPRT *xprt)
 {
        SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp;
+       int g;
 
        SVC_ACQUIRE(xprt);
-       mtx_lock(&pool->sp_lock);
+       g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
+       xprt->xp_group = grp = &pool->sp_groups[g];
+       mtx_lock(&grp->sg_lock);
        xprt->xp_registered = TRUE;
        xprt->xp_active = FALSE;
-       TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
-       mtx_unlock(&pool->sp_lock);
+       TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
+       mtx_unlock(&grp->sg_lock);
 }
 
 /*
@@ -300,29 +323,29 @@ xprt_register(SVCXPRT *xprt)
 static void
 xprt_unregister_locked(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
 
-       mtx_assert(&pool->sp_lock, MA_OWNED);
+       mtx_assert(&grp->sg_lock, MA_OWNED);
        KASSERT(xprt->xp_registered == TRUE,
            ("xprt_unregister_locked: not registered"));
        xprt_inactive_locked(xprt);
-       TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
+       TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
        xprt->xp_registered = FALSE;
 }
 
 void
 xprt_unregister(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
 
-       mtx_lock(&pool->sp_lock);
+       mtx_lock(&grp->sg_lock);
        if (xprt->xp_registered == FALSE) {
                /* Already unregistered by another thread */
-               mtx_unlock(&pool->sp_lock);
+               mtx_unlock(&grp->sg_lock);
                return;
        }
        xprt_unregister_locked(xprt);
-       mtx_unlock(&pool->sp_lock);
+       mtx_unlock(&grp->sg_lock);
 
        SVC_RELEASE(xprt);
 }
@@ -333,11 +356,11 @@ xprt_unregister(SVCXPRT *xprt)
 static int
 xprt_assignthread(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
        SVCTHREAD *st;
 
-       mtx_assert(&pool->sp_lock, MA_OWNED);
-       st = LIST_FIRST(&pool->sp_idlethreads);
+       mtx_assert(&grp->sg_lock, MA_OWNED);
+       st = LIST_FIRST(&grp->sg_idlethreads);
        if (st) {
                LIST_REMOVE(st, st_ilink);
                SVC_ACQUIRE(xprt);
@@ -354,10 +377,10 @@ xprt_assignthread(SVCXPRT *xprt)
                 * from a socket upcall). Don't create more
                 * than one thread per second.
                 */
-               if (pool->sp_state == SVCPOOL_ACTIVE
-                   && pool->sp_lastcreatetime < time_uptime
-                   && pool->sp_threadcount < pool->sp_maxthreads) {
-                       pool->sp_state = SVCPOOL_THREADWANTED;
+               if (grp->sg_state == SVCPOOL_ACTIVE
+                   && grp->sg_lastcreatetime < time_uptime
+                   && grp->sg_threadcount < grp->sg_maxthreads) {
+                       grp->sg_state = SVCPOOL_THREADWANTED;
                }
        }
        return (FALSE);
@@ -366,40 +389,40 @@ xprt_assignthread(SVCXPRT *xprt)
 void
 xprt_active(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
 
-       mtx_lock(&pool->sp_lock);
+       mtx_lock(&grp->sg_lock);
 
        if (!xprt->xp_registered) {
                /*
                 * Race with xprt_unregister - we lose.
                 */
-               mtx_unlock(&pool->sp_lock);
+               mtx_unlock(&grp->sg_lock);
                return;
        }
 
        if (!xprt->xp_active) {
                xprt->xp_active = TRUE;
                if (xprt->xp_thread == NULL) {
-                       if (!svc_request_space_available(pool) ||
+                       if (!svc_request_space_available(xprt->xp_pool) ||
                            !xprt_assignthread(xprt))
-                               TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
+                               TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
                                    xp_alink);
                }
        }
 
-       mtx_unlock(&pool->sp_lock);
+       mtx_unlock(&grp->sg_lock);
 }
 
 void
 xprt_inactive_locked(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
 
-       mtx_assert(&pool->sp_lock, MA_OWNED);
+       mtx_assert(&grp->sg_lock, MA_OWNED);
        if (xprt->xp_active) {
                if (xprt->xp_thread == NULL)
-                       TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+                       TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
                xprt->xp_active = FALSE;
        }
 }
@@ -407,11 +430,11 @@ xprt_inactive_locked(SVCXPRT *xprt)
 void
 xprt_inactive(SVCXPRT *xprt)
 {
-       SVCPOOL *pool = xprt->xp_pool;
+       SVCGROUP *grp = xprt->xp_group;
 
-       mtx_lock(&pool->sp_lock);
+       mtx_lock(&grp->sg_lock);
        xprt_inactive_locked(xprt);
-       mtx_unlock(&pool->sp_lock);
+       mtx_unlock(&grp->sg_lock);
 }
 
 /*
@@ -991,14 +1014,14 @@ svc_executereq(struct svc_req *rqstp)
 }
 
 static void
-svc_checkidle(SVCPOOL *pool)
+svc_checkidle(SVCGROUP *grp)
 {
        SVCXPRT *xprt, *nxprt;
        time_t timo;
        struct svcxprt_list cleanup;
 
        TAILQ_INIT(&cleanup);
-       TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
+       TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
                /*
                 * Only some transports have idle timers. Don't time
                 * something out which is just waking up.
@@ -1013,27 +1036,31 @@ svc_checkidle(SVCPOOL *pool)
                }
        }
 
-       mtx_unlock(&pool->sp_lock);
+       mtx_unlock(&grp->sg_lock);
        TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
                SVC_RELEASE(xprt);
        }
-       mtx_lock(&pool->sp_lock);
-
+       mtx_lock(&grp->sg_lock);
 }
 
 static void
 svc_assign_waiting_sockets(SVCPOOL *pool)
 {
+       SVCGROUP *grp;
        SVCXPRT *xprt;
+       int g;
 
-       mtx_lock(&pool->sp_lock);
-       while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
-               if (xprt_assignthread(xprt))
-                       TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
-               else
-                       break;
+       for (g = 0; g < pool->sp_groupcount; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_lock(&grp->sg_lock);
+               while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+                       if (xprt_assignthread(xprt))
+                               TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
+                       else
+                               break;
+               }
+               mtx_unlock(&grp->sg_lock);
        }
-       mtx_unlock(&pool->sp_lock);
 }
 
 static void
@@ -1067,8 +1094,9 @@ svc_request_space_available(SVCPOOL *poo
 }
 
 static void
-svc_run_internal(SVCPOOL *pool, bool_t ismaster)
+svc_run_internal(SVCGROUP *grp, bool_t ismaster)
 {
+       SVCPOOL *pool = grp->sg_pool;
        SVCTHREAD *st, *stpref;
        SVCXPRT *xprt;
        enum xprt_stat stat;
@@ -1083,35 +1111,34 @@ svc_run_internal(SVCPOOL *pool, bool_t i
        STAILQ_INIT(&st->st_reqs);
        cv_init(&st->st_cond, "rpcsvc");
 
-       mtx_lock(&pool->sp_lock);
-       LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
+       mtx_lock(&grp->sg_lock);
 
        /*
         * If we are a new thread which was spawned to cope with
         * increased load, set the state back to SVCPOOL_ACTIVE.
         */
-       if (pool->sp_state == SVCPOOL_THREADSTARTING)
-               pool->sp_state = SVCPOOL_ACTIVE;
+       if (grp->sg_state == SVCPOOL_THREADSTARTING)
+               grp->sg_state = SVCPOOL_ACTIVE;
 
-       while (pool->sp_state != SVCPOOL_CLOSING) {
+       while (grp->sg_state != SVCPOOL_CLOSING) {
                /*
                 * Create new thread if requested.
                 */
-               if (pool->sp_state == SVCPOOL_THREADWANTED) {
-                       pool->sp_state = SVCPOOL_THREADSTARTING;
-                       pool->sp_lastcreatetime = time_uptime;
-                       mtx_unlock(&pool->sp_lock);
-                       svc_new_thread(pool);
-                       mtx_lock(&pool->sp_lock);
+               if (grp->sg_state == SVCPOOL_THREADWANTED) {
+                       grp->sg_state = SVCPOOL_THREADSTARTING;
+                       grp->sg_lastcreatetime = time_uptime;
+                       mtx_unlock(&grp->sg_lock);
+                       svc_new_thread(grp);
+                       mtx_lock(&grp->sg_lock);
                        continue;
                }
 
                /*
                 * Check for idle transports once per second.
                 */
-               if (time_uptime > pool->sp_lastidlecheck) {
-                       pool->sp_lastidlecheck = time_uptime;
-                       svc_checkidle(pool);
+               if (time_uptime > grp->sg_lastidlecheck) {
+                       grp->sg_lastidlecheck = time_uptime;
+                       svc_checkidle(grp);
                }
 
                xprt = st->st_xprt;
@@ -1119,7 +1146,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                        /*
                         * Enforce maxthreads count.
                         */
-                       if (pool->sp_threadcount > pool->sp_maxthreads)
+                       if (grp->sg_threadcount > grp->sg_maxthreads)
                                break;
 
                        /*
@@ -1128,22 +1155,22 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                         * by a thread.
                         */
                        if (svc_request_space_available(pool) &&
-                           (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
-                               TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+                           (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+                               TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
                                SVC_ACQUIRE(xprt);
                                xprt->xp_thread = st;
                                st->st_xprt = xprt;
                                continue;
                        }
 
-                       LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
+                       LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
                        if (ismaster || (!ismaster &&
-                           pool->sp_threadcount > pool->sp_minthreads))
+                           grp->sg_threadcount > grp->sg_minthreads))
                                error = cv_timedwait_sig(&st->st_cond,
-                                   &pool->sp_lock, 5 * hz);
+                                   &grp->sg_lock, 5 * hz);
                        else
                                error = cv_wait_sig(&st->st_cond,
-                                   &pool->sp_lock);
+                                   &grp->sg_lock);
                        if (st->st_xprt == NULL)
                                LIST_REMOVE(st, st_ilink);
 
@@ -1152,19 +1179,19 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                         */
                        if (error == EWOULDBLOCK) {
                                if (!ismaster
-                                   && (pool->sp_threadcount
-                                       > pool->sp_minthreads)
+                                   && (grp->sg_threadcount
+                                       > grp->sg_minthreads)
                                        && !st->st_xprt)
                                        break;
                        } else if (error) {
-                               mtx_unlock(&pool->sp_lock);
+                               mtx_unlock(&grp->sg_lock);
                                svc_exit(pool);
-                               mtx_lock(&pool->sp_lock);
+                               mtx_lock(&grp->sg_lock);
                                break;
                        }
                        continue;
                }
-               mtx_unlock(&pool->sp_lock);
+               mtx_unlock(&grp->sg_lock);
 
                /*
                 * Drain the transport socket and queue up any RPCs.
@@ -1194,7 +1221,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                                            rqstp, rq_link);
                        }
                } while (rqstp == NULL && stat == XPRT_MOREREQS
-                   && pool->sp_state != SVCPOOL_CLOSING);
+                   && grp->sg_state != SVCPOOL_CLOSING);
 
                /*
                 * Move this transport to the end of the active list to
@@ -1202,16 +1229,16 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                 * If this was the last queued request, svc_getreq will end
                 * up calling xprt_inactive to remove from the active list.
                 */
-               mtx_lock(&pool->sp_lock);
+               mtx_lock(&grp->sg_lock);
                xprt->xp_thread = NULL;
                st->st_xprt = NULL;
                if (xprt->xp_active) {
                        if (!svc_request_space_available(pool) ||
                            !xprt_assignthread(xprt))
-                               TAILQ_INSERT_TAIL(&pool->sp_active,
+                               TAILQ_INSERT_TAIL(&grp->sg_active,
                                    xprt, xp_alink);
                }
-               mtx_unlock(&pool->sp_lock);
+               mtx_unlock(&grp->sg_lock);
                SVC_RELEASE(xprt);
 
                /*
@@ -1228,7 +1255,7 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                }
                mtx_unlock(&st->st_lock);
                svc_change_space_used(pool, -sz);
-               mtx_lock(&pool->sp_lock);
+               mtx_lock(&grp->sg_lock);
        }
 
        if (st->st_xprt) {
@@ -1236,46 +1263,43 @@ svc_run_internal(SVCPOOL *pool, bool_t i
                st->st_xprt = NULL;
                SVC_RELEASE(xprt);
        }
-
        KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
-       LIST_REMOVE(st, st_link);
-       pool->sp_threadcount--;
-
-       mtx_unlock(&pool->sp_lock);
-
        mtx_destroy(&st->st_lock);
        cv_destroy(&st->st_cond);
        mem_free(st, sizeof(*st));
 
+       grp->sg_threadcount--;
        if (!ismaster)
-               wakeup(pool);
+               wakeup(grp);
+       mtx_unlock(&grp->sg_lock);
 }
 
 static void
 svc_thread_start(void *arg)
 {
 
-       svc_run_internal((SVCPOOL *) arg, FALSE);
+       svc_run_internal((SVCGROUP *) arg, FALSE);
        kthread_exit();
 }
 
 static void
-svc_new_thread(SVCPOOL *pool)
+svc_new_thread(SVCGROUP *grp)
 {
+       SVCPOOL *pool = grp->sg_pool;
        struct thread *td;
 
-       pool->sp_threadcount++;
-       kthread_add(svc_thread_start, pool,
-           pool->sp_proc, &td, 0, 0,
+       grp->sg_threadcount++;
+       kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
            "%s: service", pool->sp_name);
 }
 
 void
 svc_run(SVCPOOL *pool)
 {
-       int i;
+       int g, i;
        struct proc *p;
        struct thread *td;
+       SVCGROUP *grp;
 
        p = curproc;
        td = curthread;
@@ -1283,35 +1307,56 @@ svc_run(SVCPOOL *pool)
            "%s: master", pool->sp_name);
        pool->sp_state = SVCPOOL_ACTIVE;
        pool->sp_proc = p;
-       pool->sp_lastcreatetime = time_uptime;
-       pool->sp_threadcount = 1;
 
-       for (i = 1; i < pool->sp_minthreads; i++) {
-               svc_new_thread(pool);
+       /* Choose group count based on number of threads and CPUs. */
+       pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
+           min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
+       for (g = 0; g < pool->sp_groupcount; g++) {
+               grp = &pool->sp_groups[g];
+               grp->sg_minthreads = max(1,
+                   pool->sp_minthreads / pool->sp_groupcount);
+               grp->sg_maxthreads = max(1,
+                   pool->sp_maxthreads / pool->sp_groupcount);
+               grp->sg_lastcreatetime = time_uptime;
+       }
+
+       /* Starting threads */
+       for (g = 0; g < pool->sp_groupcount; g++) {
+               grp = &pool->sp_groups[g];
+               for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
+                       svc_new_thread(grp);
+       }
+       pool->sp_groups[0].sg_threadcount++;
+       svc_run_internal(&pool->sp_groups[0], TRUE);
+
+       /* Waiting for threads to stop. */
+       for (g = 0; g < pool->sp_groupcount; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_lock(&grp->sg_lock);
+               while (grp->sg_threadcount > 0)
+                       msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
+               mtx_unlock(&grp->sg_lock);
        }
-
-       svc_run_internal(pool, TRUE);
-
-       mtx_lock(&pool->sp_lock);
-       while (pool->sp_threadcount > 0)
-               msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
-       mtx_unlock(&pool->sp_lock);
 }
 
 void
 svc_exit(SVCPOOL *pool)
 {
+       SVCGROUP *grp;
        SVCTHREAD *st;
+       int g;
 
-       mtx_lock(&pool->sp_lock);
-
-       if (pool->sp_state != SVCPOOL_CLOSING) {
-               pool->sp_state = SVCPOOL_CLOSING;
-               LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
-                       cv_signal(&st->st_cond);
+       pool->sp_state = SVCPOOL_CLOSING;
+       for (g = 0; g < pool->sp_groupcount; g++) {
+               grp = &pool->sp_groups[g];
+               mtx_lock(&grp->sg_lock);
+               if (grp->sg_state != SVCPOOL_CLOSING) {
+                       grp->sg_state = SVCPOOL_CLOSING;
+                       LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
+                               cv_signal(&st->st_cond);
+               }
+               mtx_unlock(&grp->sg_lock);
        }
-
-       mtx_unlock(&pool->sp_lock);
 }
 
 bool_t

Modified: head/sys/rpc/svc.h
==============================================================================
--- head/sys/rpc/svc.h  Sun Jun  8 10:56:25 2014        (r267227)
+++ head/sys/rpc/svc.h  Sun Jun  8 11:19:32 2014        (r267228)
@@ -137,6 +137,7 @@ struct xp_ops2 {
 
 #ifdef _KERNEL
 struct __rpc_svcpool;
+struct __rpc_svcgroup;
 struct __rpc_svcthread;
 #endif
 
@@ -150,6 +151,7 @@ typedef struct __rpc_svcxprt {
        volatile u_int  xp_refs;
        struct sx       xp_lock;
        struct __rpc_svcpool *xp_pool;  /* owning pool (see below) */
+       struct __rpc_svcgroup *xp_group; /* owning group (see below) */
        TAILQ_ENTRY(__rpc_svcxprt) xp_link;
        TAILQ_ENTRY(__rpc_svcxprt) xp_alink;
        bool_t          xp_registered;  /* xprt_register has been called */
@@ -245,8 +247,6 @@ struct svc_loss_callout {
 };
 TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
 
-struct __rpc_svcthread;
-
 /*
  * Service request
  */
@@ -296,7 +296,6 @@ typedef struct __rpc_svcthread {
        SVCXPRT                 *st_xprt; /* transport we are processing */
        struct svc_reqlist      st_reqs;  /* RPC requests to execute */
        struct cv               st_cond; /* sleeping for work */
-       LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
        LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
        LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
        int             st_p2;          /* application workspace */
@@ -305,6 +304,36 @@ typedef struct __rpc_svcthread {
 LIST_HEAD(svcthread_list, __rpc_svcthread);
 
 /*
+ * A thread group contain all information needed to assign subset of
+ * transports to subset of threads.  On systems with many CPUs and many
+ * threads that allows to reduce lock congestion and improve performance.
+ * Hundreds of threads on dozens of CPUs sharing the single pool lock do
+ * not scale well otherwise.
+ */
+TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
+enum svcpool_state {
+       SVCPOOL_INIT,           /* svc_run not called yet */
+       SVCPOOL_ACTIVE,         /* normal running state */
+       SVCPOOL_THREADWANTED,   /* new service thread requested */
+       SVCPOOL_THREADSTARTING, /* new service thread started */
+       SVCPOOL_CLOSING         /* svc_exit called */
+};
+typedef struct __rpc_svcgroup {
+       struct mtx_padalign sg_lock;    /* protect the thread/req lists */
+       struct __rpc_svcpool    *sg_pool;
+       enum svcpool_state sg_state;    /* current pool state */
+       struct svcxprt_list sg_xlist;   /* all transports in the group */
+       struct svcxprt_list sg_active;  /* transports needing service */
+       struct svcthread_list sg_idlethreads; /* idle service threads */
+
+       int             sg_minthreads;  /* minimum service thread count */
+       int             sg_maxthreads;  /* maximum service thread count */
+       int             sg_threadcount; /* current service thread count */
+       time_t          sg_lastcreatetime; /* when we last started a thread */
+       time_t          sg_lastidlecheck;  /* when we last checked idle 
transports */
+} SVCGROUP;
+
+/*
  * In the kernel, we can't use global variables to store lists of
  * transports etc. since otherwise we could not have two unrelated RPC
  * services running, each on its own thread. We solve this by
@@ -316,32 +345,18 @@ LIST_HEAD(svcthread_list, __rpc_svcthrea
  * this to support something similar to the Solaris multi-threaded RPC
  * server.
  */
-TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
-enum svcpool_state {
-       SVCPOOL_INIT,           /* svc_run not called yet */
-       SVCPOOL_ACTIVE,         /* normal running state */
-       SVCPOOL_THREADWANTED,   /* new service thread requested */
-       SVCPOOL_THREADSTARTING, /* new service thread started */
-       SVCPOOL_CLOSING         /* svc_exit called */
-};
 typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
 typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
+#define        SVC_MAXGROUPS   16
 typedef struct __rpc_svcpool {
        struct mtx_padalign sp_lock;    /* protect the transport lists */
        const char      *sp_name;       /* pool name (e.g. "nfsd", "NLM" */
        enum svcpool_state sp_state;    /* current pool state */
        struct proc     *sp_proc;       /* process which is in svc_run */
-       struct svcxprt_list sp_xlist;   /* all transports in the pool */
-       struct svcxprt_list sp_active;  /* transports needing service */
        struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
        struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
-       struct svcthread_list sp_threads; /* service threads */
-       struct svcthread_list sp_idlethreads; /* idle service threads */
        int             sp_minthreads;  /* minimum service thread count */
        int             sp_maxthreads;  /* maximum service thread count */
-       int             sp_threadcount; /* current service thread count */
-       time_t          sp_lastcreatetime; /* when we last started a thread */
-       time_t          sp_lastidlecheck;  /* when we last checked idle 
transports */
 
        /*
         * Hooks to allow an application to control request to thread
@@ -364,6 +379,10 @@ typedef struct __rpc_svcpool {
 
        struct replay_cache *sp_rcache; /* optional replay cache */
        struct sysctl_ctx_list sp_sysctl;
+
+       int             sp_groupcount;  /* Number of groups in the pool. */
+       int             sp_nextgroup;   /* Next group to assign port. */
+       SVCGROUP        sp_groups[SVC_MAXGROUPS]; /* Thread/port groups. */
 } SVCPOOL;
 
 #else

Modified: head/sys/rpc/svc_generic.c
==============================================================================
--- head/sys/rpc/svc_generic.c  Sun Jun  8 10:56:25 2014        (r267227)
+++ head/sys/rpc/svc_generic.c  Sun Jun  8 11:19:32 2014        (r267228)
@@ -86,7 +86,8 @@ svc_create(
        rpcvers_t versnum,              /* Version number */
        const char *nettype)            /* Networktype token */
 {
-       int num = 0;
+       int g, num = 0;
+       SVCGROUP *grp;
        SVCXPRT *xprt;
        struct netconfig *nconf;
        void *handle;
@@ -96,11 +97,14 @@ svc_create(
                return (0);
        }
        while ((nconf = __rpc_getconf(handle)) != NULL) {
-               mtx_lock(&pool->sp_lock);
-               TAILQ_FOREACH(xprt, &pool->sp_xlist, xp_link) {
-                       if (strcmp(xprt->xp_netid, nconf->nc_netid) == 0) {
+               for (g = 0; g < SVC_MAXGROUPS; g++) {
+                       grp = &pool->sp_groups[g];
+                       mtx_lock(&grp->sg_lock);
+                       TAILQ_FOREACH(xprt, &grp->sg_xlist, xp_link) {
+                               if (strcmp(xprt->xp_netid, nconf->nc_netid))
+                                       continue;
                                /* Found an old one, use it */
-                               mtx_unlock(&pool->sp_lock);
+                               mtx_unlock(&grp->sg_lock);
                                (void) rpcb_unset(prognum, versnum, nconf);
                                if (svc_reg(xprt, prognum, versnum,
                                        dispatch, nconf) == FALSE) {
@@ -108,15 +112,15 @@ svc_create(
                "svc_create: could not register prog %u vers %u on %s\n",
                                        (unsigned)prognum, (unsigned)versnum,
                                         nconf->nc_netid);
-                                       mtx_lock(&pool->sp_lock);
+                                       mtx_lock(&grp->sg_lock);
                                } else {
                                        num++;
-                                       mtx_lock(&pool->sp_lock);
+                                       mtx_lock(&grp->sg_lock);
                                        break;
                                }
                        }
+                       mtx_unlock(&grp->sg_lock);
                }
-               mtx_unlock(&pool->sp_lock);
                if (xprt == NULL) {
                        /* It was not found. Now create a new one */
                        xprt = svc_tp_create(pool, dispatch, prognum, versnum,
_______________________________________________
svn-src-head@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-head
To unsubscribe, send any mail to "svn-src-head-unsubscr...@freebsd.org"

Reply via email to