There could be a service transport, which is processed by service thread and
racing in the same time with per-net service shutdown like listed below:

CPU#0:                            CPU#1:

svc_recv                        svc_close_net
svc_get_next_xprt (list_del_init(xpt_ready))
                            svc_close_list (set XPT_BUSY and XPT_CLOSE)
                            svc_clear_pools(xprt was gained on CPU#0 already)
                            svc_delete_xprt (set XPT_DEAD)
svc_handle_xprt (is XPT_CLOSE => svc_delete_xprt()
BUG()

There can be different solutions of the problem.
Probably, the patch doesn't implement the best one, but I hope the simple one.
IOW, it protects critical section (dequeuing of pending transport and
enqueuing it back to the pool) by per-service rw semaphore, taken for read.
On per-net transports shutdown, this semaphore have to be taken for write.

Note: add the PERCPU_RWSEM config option selected by SUNRPC.

Signed-off-by: Stanislav Kinsbursky <skinsbur...@parallels.com>
---
 include/linux/sunrpc/svc.h |    2 ++
 net/sunrpc/Kconfig         |    1 +
 net/sunrpc/svc.c           |    2 ++
 net/sunrpc/svc_xprt.c      |   33 +++++++++++++++++++++++++++------
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 1f0216b..2031d14 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -103,6 +103,8 @@ struct svc_serv {
                                                 * entries in the svc_cb_list */
        struct svc_xprt         *sv_bc_xprt;    /* callback on fore channel */
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+       struct percpu_rw_semaphore      sv_xprt_sem; /* sem to sync against 
per-net transports shutdown */
 };
 
 /*
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index 03d03e3..5e6548c 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -1,5 +1,6 @@
 config SUNRPC
        tristate
+       select PERCPU_RWSEM
 
 config SUNRPC_GSS
        tristate
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index b9ba2a8..ef74c72 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -483,6 +483,8 @@ __svc_create(struct svc_program *prog, unsigned int 
bufsize, int npools,
        if (svc_uses_rpcbind(serv) && (!serv->sv_shutdown))
                serv->sv_shutdown = svc_rpcb_cleanup;
 
+       percpu_init_rwsem(&serv->sv_xprt_sem);
+
        return serv;
 }
 
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 5a9d40c..855a6ba 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -471,6 +471,8 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
        svc_reserve(rqstp, 0);
        rqstp->rq_xprt = NULL;
 
+       percpu_up_read(&rqstp->rq_server->sv_xprt_sem);
+
        svc_xprt_put(xprt);
 }
 
@@ -618,12 +620,14 @@ struct svc_xprt *svc_get_next_xprt(struct svc_rqst 
*rqstp, long timeout)
        struct svc_pool         *pool = rqstp->rq_pool;
        DECLARE_WAITQUEUE(wait, current);
        long                    time_left;
+       struct svc_serv         *serv = rqstp->rq_server;
 
        /* Normally we will wait up to 5 seconds for any required
         * cache information to be provided.
         */
        rqstp->rq_chandle.thread_wait = 5*HZ;
 
+       percpu_down_read(&serv->sv_xprt_sem);
        spin_lock_bh(&pool->sp_lock);
        xprt = svc_xprt_dequeue(pool);
        if (xprt) {
@@ -640,7 +644,8 @@ struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, 
long timeout)
                if (pool->sp_task_pending) {
                        pool->sp_task_pending = 0;
                        spin_unlock_bh(&pool->sp_lock);
-                       return ERR_PTR(-EAGAIN);
+                       xprt = ERR_PTR(-EAGAIN);
+                       goto out_err;
                }
                /* No data pending. Go to sleep */
                svc_thread_enqueue(pool, rqstp);
@@ -661,16 +666,19 @@ struct svc_xprt *svc_get_next_xprt(struct svc_rqst 
*rqstp, long timeout)
                if (kthread_should_stop()) {
                        set_current_state(TASK_RUNNING);
                        spin_unlock_bh(&pool->sp_lock);
-                       return ERR_PTR(-EINTR);
+                       xprt = ERR_PTR(-EINTR);
+                       goto out_err;
                }
 
                add_wait_queue(&rqstp->rq_wait, &wait);
                spin_unlock_bh(&pool->sp_lock);
+               percpu_up_read(&serv->sv_xprt_sem);
 
                time_left = schedule_timeout(timeout);
 
                try_to_freeze();
 
+               percpu_down_read(&serv->sv_xprt_sem);
                spin_lock_bh(&pool->sp_lock);
                remove_wait_queue(&rqstp->rq_wait, &wait);
                if (!time_left)
@@ -681,13 +689,24 @@ struct svc_xprt *svc_get_next_xprt(struct svc_rqst 
*rqstp, long timeout)
                        svc_thread_dequeue(pool, rqstp);
                        spin_unlock_bh(&pool->sp_lock);
                        dprintk("svc: server %p, no data yet\n", rqstp);
-                       if (signalled() || kthread_should_stop())
-                               return ERR_PTR(-EINTR);
-                       else
-                               return ERR_PTR(-EAGAIN);
+                       if (signalled() || kthread_should_stop()) {
+                               xprt = ERR_PTR(-EINTR);
+                               goto out_err;
+                       } else {
+                               xprt = ERR_PTR(-EAGAIN);
+                               goto out_err;
+                       }
                }
        }
        spin_unlock_bh(&pool->sp_lock);
+out_err:
+       if (IS_ERR(xprt))
+               /*
+                * Note: we relased semaphore only if an error occured.
+                * Otherwise we hold it till transport processing will be
+                * completed in svc_xprt_release().
+                */
+               percpu_up_read(&serv->sv_xprt_sem);
        return xprt;
 }
 
@@ -1020,10 +1039,12 @@ static void svc_clear_list(struct svc_serv *serv, 
struct list_head *xprt_list, s
 
 void svc_close_net(struct svc_serv *serv, struct net *net)
 {
+       percpu_down_write(&serv->sv_xprt_sem);
        svc_close_list(serv, &serv->sv_tempsocks, net);
        svc_close_list(serv, &serv->sv_permsocks, net);
 
        svc_clear_pools(serv, net);
+       percpu_up_write(&serv->sv_xprt_sem);
        /*
         * At this point the sp_sockets lists will stay empty, since
         * svc_xprt_enqueue will not add new entries without taking the

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to