Author: dfr
Date: Fri May 15 13:58:45 2009
New Revision: 192142
URL: http://svn.freebsd.org/changeset/base/192142

Log:
  Back port a change to the locking model used to manage active transports from
  FreeBSD-current to avoid a deadlock.
  
  PR:           130628

Modified:
  stable/7/sys/rpc/svc.c
  stable/7/sys/rpc/svc.h
  stable/7/sys/rpc/svc_dg.c
  stable/7/sys/rpc/svc_vc.c

Modified: stable/7/sys/rpc/svc.c
==============================================================================
--- stable/7/sys/rpc/svc.c      Fri May 15 13:26:54 2009        (r192141)
+++ stable/7/sys/rpc/svc.c      Fri May 15 13:58:45 2009        (r192142)
@@ -178,18 +178,23 @@ xprt_active(SVCXPRT *xprt)
 }
 
 void
-xprt_inactive(SVCXPRT *xprt)
+xprt_inactive_locked(SVCXPRT *xprt)
 {
        SVCPOOL *pool = xprt->xp_pool;
 
-       mtx_lock(&pool->sp_lock);
-
        if (xprt->xp_active) {
                TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
                xprt->xp_active = FALSE;
        }
-       wakeup(&pool->sp_active);
+}
 
+void
+xprt_inactive(SVCXPRT *xprt)
+{
+       SVCPOOL *pool = xprt->xp_pool;
+
+       mtx_lock(&pool->sp_lock);
+       xprt_inactive_locked(xprt);
        mtx_unlock(&pool->sp_lock);
 }
 

Modified: stable/7/sys/rpc/svc.h
==============================================================================
--- stable/7/sys/rpc/svc.h      Fri May 15 13:26:54 2009        (r192141)
+++ stable/7/sys/rpc/svc.h      Fri May 15 13:58:45 2009        (r192142)
@@ -47,6 +47,7 @@
 #include <sys/queue.h>
 #include <sys/_lock.h>
 #include <sys/_mutex.h>
+#include <sys/_sx.h>
 #endif
 
 /*
@@ -128,7 +129,7 @@ struct __rpc_svcpool;
  */
 typedef struct __rpc_svcxprt {
 #ifdef _KERNEL
-       struct mtx      xp_lock;
+       struct sx       xp_lock;
        struct __rpc_svcpool *xp_pool;  /* owning pool (see below) */
        TAILQ_ENTRY(__rpc_svcxprt) xp_link;
        TAILQ_ENTRY(__rpc_svcxprt) xp_alink;
@@ -332,6 +333,7 @@ __END_DECLS
 __BEGIN_DECLS
 extern void    xprt_active(SVCXPRT *);
 extern void    xprt_inactive(SVCXPRT *);
+extern void    xprt_inactive_locked(SVCXPRT *);
 __END_DECLS
 
 #endif

Modified: stable/7/sys/rpc/svc_dg.c
==============================================================================
--- stable/7/sys/rpc/svc_dg.c   Fri May 15 13:26:54 2009        (r192141)
+++ stable/7/sys/rpc/svc_dg.c   Fri May 15 13:58:45 2009        (r192142)
@@ -53,6 +53,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/queue.h>
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/sx.h>
 #include <sys/systm.h>
 #include <sys/uio.h>
 
@@ -118,7 +119,7 @@ svc_dg_create(SVCPOOL *pool, struct sock
 
        xprt = mem_alloc(sizeof (SVCXPRT));
        memset(xprt, 0, sizeof (SVCXPRT));
-       mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
+       sx_init(&xprt->xp_lock, "xprt->xp_lock");
        xprt->xp_pool = pool;
        xprt->xp_socket = so;
        xprt->xp_p1 = NULL;
@@ -161,6 +162,9 @@ static enum xprt_stat
 svc_dg_stat(SVCXPRT *xprt)
 {
 
+       if (soreadable(xprt->xp_socket))
+               return (XPRT_MOREREQS);
+
        return (XPRT_IDLE);
 }
 
@@ -173,22 +177,17 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_ms
        int error, rcvflag;
 
        /*
+        * Serialise access to the socket.
+        */
+       sx_xlock(&xprt->xp_lock);
+
+       /*
         * The socket upcall calls xprt_active() which will eventually
         * cause the server to call us here. We attempt to read a
         * packet from the socket and process it. If the read fails,
         * we have drained all pending requests so we call
         * xprt_inactive().
-        *
-        * The lock protects us in the case where a new packet arrives
-        * on the socket after our call to soreceive fails with
-        * EWOULDBLOCK - the call to xprt_active() in the upcall will
-        * happen only after our call to xprt_inactive() which ensures
-        * that we will remain active. It might be possible to use
-        * SOCKBUF_LOCK for this - its not clear to me what locks are
-        * held during the upcall.
         */
-       mtx_lock(&xprt->xp_lock);
-
        uio.uio_resid = 1000000000;
        uio.uio_td = curthread;
        mreq = NULL;
@@ -196,8 +195,19 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_ms
        error = soreceive(xprt->xp_socket, &raddr, &uio, &mreq, NULL, &rcvflag);
 
        if (error == EWOULDBLOCK) {
-               xprt_inactive(xprt);
-               mtx_unlock(&xprt->xp_lock);
+               /*
+                * We must re-test for readability after taking the
+                * lock to protect us in the case where a new packet
+                * arrives on the socket after our call to soreceive
+                * fails with EWOULDBLOCK. The pool lock protects us
+                * from racing the upcall after our soreadable() call
+                * returns false.
+                */
+               mtx_lock(&xprt->xp_pool->sp_lock);
+               if (!soreadable(xprt->xp_socket))
+                       xprt_inactive_locked(xprt);
+               mtx_unlock(&xprt->xp_pool->sp_lock);
+               sx_xunlock(&xprt->xp_lock);
                return (FALSE);
        }
 
@@ -208,11 +218,11 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_ms
                xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
                SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
                xprt_inactive(xprt);
-               mtx_unlock(&xprt->xp_lock);
+               sx_xunlock(&xprt->xp_lock);
                return (FALSE);
        }
 
-       mtx_unlock(&xprt->xp_lock);
+       sx_xunlock(&xprt->xp_lock);
 
        KASSERT(raddr->sa_len < xprt->xp_rtaddr.maxlen,
            ("Unexpected remote address length"));
@@ -301,7 +311,7 @@ svc_dg_destroy(SVCXPRT *xprt)
 
        xprt_unregister(xprt);
 
-       mtx_destroy(&xprt->xp_lock);
+       sx_destroy(&xprt->xp_lock);
        if (xprt->xp_socket)
                (void)soclose(xprt->xp_socket);
 
@@ -328,7 +338,5 @@ svc_dg_soupcall(struct socket *so, void 
 {
        SVCXPRT *xprt = (SVCXPRT *) arg;
 
-       mtx_lock(&xprt->xp_lock);
        xprt_active(xprt);
-       mtx_unlock(&xprt->xp_lock);
 }

Modified: stable/7/sys/rpc/svc_vc.c
==============================================================================
--- stable/7/sys/rpc/svc_vc.c   Fri May 15 13:26:54 2009        (r192141)
+++ stable/7/sys/rpc/svc_vc.c   Fri May 15 13:58:45 2009        (r192142)
@@ -54,6 +54,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/queue.h>
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/sx.h>
 #include <sys/systm.h>
 #include <sys/uio.h>
 #include <netinet/tcp.h>
@@ -142,7 +143,7 @@ svc_vc_create(SVCPOOL *pool, struct sock
        }
 
        xprt = mem_alloc(sizeof(SVCXPRT));
-       mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
+       sx_init(&xprt->xp_lock, "xprt->xp_lock");
        xprt->xp_pool = pool;
        xprt->xp_socket = so;
        xprt->xp_p1 = NULL;
@@ -219,7 +220,7 @@ svc_vc_create_conn(SVCPOOL *pool, struct
        cd->strm_stat = XPRT_IDLE;
 
        xprt = mem_alloc(sizeof(SVCXPRT));
-       mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
+       sx_init(&xprt->xp_lock, "xprt->xp_lock");
        xprt->xp_pool = pool;
        xprt->xp_socket = so;
        xprt->xp_p1 = cd;
@@ -255,9 +256,9 @@ svc_vc_create_conn(SVCPOOL *pool, struct
         * Throw the transport into the active list in case it already
         * has some data buffered.
         */
-       mtx_lock(&xprt->xp_lock);
+       sx_xlock(&xprt->xp_lock);
        xprt_active(xprt);
-       mtx_unlock(&xprt->xp_lock);
+       sx_xunlock(&xprt->xp_lock);
 
        return (xprt);
 cleanup_svc_vc_create:
@@ -347,22 +348,27 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, st
         * connection from the socket and turn it into a new
         * transport. If the accept fails, we have drained all pending
         * connections so we call xprt_inactive().
-        *
-        * The lock protects us in the case where a new connection arrives
-        * on the socket after our call to accept fails with
-        * EWOULDBLOCK - the call to xprt_active() in the upcall will
-        * happen only after our call to xprt_inactive() which ensures
-        * that we will remain active. It might be possible to use
-        * SOCKBUF_LOCK for this - its not clear to me what locks are
-        * held during the upcall.
         */
-       mtx_lock(&xprt->xp_lock);
+       sx_xlock(&xprt->xp_lock);
 
        error = svc_vc_accept(xprt->xp_socket, &so);
 
        if (error == EWOULDBLOCK) {
-               xprt_inactive(xprt);
-               mtx_unlock(&xprt->xp_lock);
+               /*
+                * We must re-test for new connections after taking
+                * the lock to protect us in the case where a new
+                * connection arrives after our call to accept fails
+                * with EWOULDBLOCK. The pool lock protects us from
+                * racing the upcall after our TAILQ_EMPTY() call
+                * returns false.
+                */
+               ACCEPT_LOCK();
+               mtx_lock(&xprt->xp_pool->sp_lock);
+               if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
+                       xprt_inactive_locked(xprt);
+               mtx_unlock(&xprt->xp_pool->sp_lock);
+               ACCEPT_UNLOCK();
+               sx_xunlock(&xprt->xp_lock);
                return (FALSE);
        }
 
@@ -373,11 +379,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, st
                xprt->xp_socket->so_rcv.sb_flags &= ~SB_UPCALL;
                SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
                xprt_inactive(xprt);
-               mtx_unlock(&xprt->xp_lock);
+               sx_xunlock(&xprt->xp_lock);
                return (FALSE);
        }
 
-       mtx_unlock(&xprt->xp_lock);
+       sx_xunlock(&xprt->xp_lock);
 
        sa = 0;
        error = soaccept(so, &sa);
@@ -422,7 +428,7 @@ svc_vc_destroy_common(SVCXPRT *xprt)
 
        xprt_unregister(xprt);
 
-       mtx_destroy(&xprt->xp_lock);
+       sx_destroy(&xprt->xp_lock);
        if (xprt->xp_socket)
                (void)soclose(xprt->xp_socket);
 
@@ -483,21 +489,29 @@ svc_vc_stat(SVCXPRT *xprt)
 
        /*
         * Return XPRT_MOREREQS if we have buffered data and we are
-        * mid-record or if we have enough data for a record marker.
+        * mid-record or if we have enough data for a record
+        * marker. Since this is only a hint, we read mpending and
+        * resid outside the lock. We do need to take the lock if we
+        * have to traverse the mbuf chain.
         */
        if (cd->mpending) {
                if (cd->resid)
                        return (XPRT_MOREREQS);
                n = 0;
+               sx_xlock(&xprt->xp_lock);
                m = cd->mpending;
                while (m && n < sizeof(uint32_t)) {
                        n += m->m_len;
                        m = m->m_next;
                }
+               sx_xunlock(&xprt->xp_lock);
                if (n >= sizeof(uint32_t))
                        return (XPRT_MOREREQS);
        }
 
+       if (soreadable(xprt->xp_socket))
+               return (XPRT_MOREREQS);
+
        return (XPRT_IDLE);
 }
 
@@ -509,6 +523,12 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
        struct mbuf *m;
        int error, rcvflag;
 
+       /*
+        * Serialise access to the socket and our own record parsing
+        * state.
+        */
+       sx_xlock(&xprt->xp_lock);
+
        for (;;) {
                /*
                 * If we have an mbuf chain in cd->mpending, try to parse a
@@ -584,6 +604,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                                 */
                                xdrmbuf_create(&xprt->xp_xdrreq, cd->mreq, 
XDR_DECODE);
                                cd->mreq = NULL;
+                               sx_xunlock(&xprt->xp_lock);
                                if (! xdr_callmsg(&xprt->xp_xdrreq, msg)) {
                                        XDR_DESTROY(&xprt->xp_xdrreq);
                                        return (FALSE);
@@ -602,17 +623,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                 * the result in cd->mpending. If the read fails,
                 * we have drained both cd->mpending and the socket so
                 * we can call xprt_inactive().
-                *
-                * The lock protects us in the case where a new packet arrives
-                * on the socket after our call to soreceive fails with
-                * EWOULDBLOCK - the call to xprt_active() in the upcall will
-                * happen only after our call to xprt_inactive() which ensures
-                * that we will remain active. It might be possible to use
-                * SOCKBUF_LOCK for this - its not clear to me what locks are
-                * held during the upcall.
                 */
-               mtx_lock(&xprt->xp_lock);
-
                uio.uio_resid = 1000000000;
                uio.uio_td = curthread;
                m = NULL;
@@ -621,8 +632,20 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                    &rcvflag);
 
                if (error == EWOULDBLOCK) {
-                       xprt_inactive(xprt);
-                       mtx_unlock(&xprt->xp_lock);
+                       /*
+                        * We must re-test for readability after
+                        * taking the lock to protect us in the case
+                        * where a new packet arrives on the socket
+                        * after our call to soreceive fails with
+                        * EWOULDBLOCK. The pool lock protects us from
+                        * racing the upcall after our soreadable()
+                        * call returns false.
+                        */
+                       mtx_lock(&xprt->xp_pool->sp_lock);
+                       if (!soreadable(xprt->xp_socket))
+                               xprt_inactive_locked(xprt);
+                       mtx_unlock(&xprt->xp_pool->sp_lock);
+                       sx_xunlock(&xprt->xp_lock);
                        return (FALSE);
                }
 
@@ -634,7 +657,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                        SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
                        xprt_inactive(xprt);
                        cd->strm_stat = XPRT_DIED;
-                       mtx_unlock(&xprt->xp_lock);
+                       sx_xunlock(&xprt->xp_lock);
                        return (FALSE);
                }
 
@@ -642,8 +665,9 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                        /*
                         * EOF - the other end has closed the socket.
                         */
+                       xprt_inactive(xprt);
                        cd->strm_stat = XPRT_DIED;
-                       mtx_unlock(&xprt->xp_lock);
+                       sx_xunlock(&xprt->xp_lock);
                        return (FALSE);
                }
 
@@ -651,8 +675,6 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
                        m_last(cd->mpending)->m_next = m;
                else
                        cd->mpending = m;
-
-               mtx_unlock(&xprt->xp_lock);
        }
 }
 
@@ -739,9 +761,7 @@ svc_vc_soupcall(struct socket *so, void 
 {
        SVCXPRT *xprt = (SVCXPRT *) arg;
 
-       mtx_lock(&xprt->xp_lock);
        xprt_active(xprt);
-       mtx_unlock(&xprt->xp_lock);
 }
 
 #if 0
_______________________________________________
svn-src-all@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-all
To unsubscribe, send any mail to "svn-src-all-unsubscr...@freebsd.org"

Reply via email to