Author: rmacklem
Date: Sat Dec  8 00:29:16 2012
New Revision: 244008
URL: http://svnweb.freebsd.org/changeset/base/244008

Log:
  Add support for backchannels to the kernel RPC. Backchannels
  are used by NFSv4.1 for callbacks. A backchannel is a connection
  established by the client, but used for RPCs done by the server
  on the client (callbacks). As a result, this patch mixes some
  client side calls in the server side and vice versa. Some
  definitions in the .c files were extracted out into a file called
  krpc.h, so that they could be included in multiple .c files.
  This code has been in projects/nfsv4.1-client for some time.
  Although no one has given it a formal review, I believe kib@
  has taken a look at it.

Added:
  head/sys/rpc/krpc.h   (contents, props changed)
Modified:
  head/sys/rpc/clnt.h
  head/sys/rpc/clnt_rc.c
  head/sys/rpc/clnt_vc.c
  head/sys/rpc/svc.h
  head/sys/rpc/svc_vc.c

Modified: head/sys/rpc/clnt.h
==============================================================================
--- head/sys/rpc/clnt.h Sat Dec  8 00:28:16 2012        (r244007)
+++ head/sys/rpc/clnt.h Sat Dec  8 00:29:16 2012        (r244008)
@@ -372,6 +372,7 @@ enum clnt_stat clnt_call_private(CLIENT 
 #define CLGET_RETRIES          26      /* get retry count for reconnect */
 #define CLSET_PRIVPORT         27      /* set privileged source port flag */
 #define CLGET_PRIVPORT         28      /* get privileged source port flag */
+#define CLSET_BACKCHANNEL      29      /* set backchannel for socket */
 #endif
 
 

Modified: head/sys/rpc/clnt_rc.c
==============================================================================
--- head/sys/rpc/clnt_rc.c      Sat Dec  8 00:28:16 2012        (r244007)
+++ head/sys/rpc/clnt_rc.c      Sat Dec  8 00:29:16 2012        (r244008)
@@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$");
 
 #include <rpc/rpc.h>
 #include <rpc/rpc_com.h>
+#include <rpc/krpc.h>
 
 static enum clnt_stat clnt_reconnect_call(CLIENT *, struct rpc_callextra *,
     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
@@ -67,27 +68,6 @@ static struct clnt_ops clnt_reconnect_op
 
 static int     fake_wchan;
 
-struct rc_data {
-       struct mtx              rc_lock;
-       struct sockaddr_storage rc_addr; /* server address */
-       struct netconfig*       rc_nconf; /* network type */
-       rpcprog_t               rc_prog;  /* program number */
-       rpcvers_t               rc_vers;  /* version number */
-       size_t                  rc_sendsz;
-       size_t                  rc_recvsz;
-       struct timeval          rc_timeout;
-       struct timeval          rc_retry;
-       int                     rc_retries;
-       int                     rc_privport;
-       char                    *rc_waitchan;
-       int                     rc_intr;
-       int                     rc_connecting;
-       int                     rc_closed;
-       struct ucred            *rc_ucred;
-       CLIENT*                 rc_client; /* underlying RPC client */
-       struct rpc_err          rc_err;
-};
-
 CLIENT *
 clnt_reconnect_create(
        struct netconfig *nconf,        /* network type */
@@ -211,6 +191,8 @@ clnt_reconnect_connect(CLIENT *cl)
        CLNT_CONTROL(newclient, CLSET_RETRY_TIMEOUT, &rc->rc_retry);
        CLNT_CONTROL(newclient, CLSET_WAITCHAN, rc->rc_waitchan);
        CLNT_CONTROL(newclient, CLSET_INTERRUPTIBLE, &rc->rc_intr);
+       if (rc->rc_backchannel != NULL)
+               CLNT_CONTROL(newclient, CLSET_BACKCHANNEL, rc->rc_backchannel);
        stat = RPC_SUCCESS;
 
 out:
@@ -385,6 +367,7 @@ static bool_t
 clnt_reconnect_control(CLIENT *cl, u_int request, void *info)
 {
        struct rc_data *rc = (struct rc_data *)cl->cl_private;
+       SVCXPRT *xprt;
 
        if (info == NULL) {
                return (FALSE);
@@ -466,6 +449,13 @@ clnt_reconnect_control(CLIENT *cl, u_int
                *(int *) info = rc->rc_privport;
                break;
 
+       case CLSET_BACKCHANNEL:
+               xprt = (SVCXPRT *)info;
+               SVC_ACQUIRE(xprt);
+               xprt_register(xprt);
+               rc->rc_backchannel = info;
+               break;
+
        default:
                return (FALSE);
        }
@@ -502,9 +492,15 @@ static void
 clnt_reconnect_destroy(CLIENT *cl)
 {
        struct rc_data *rc = (struct rc_data *)cl->cl_private;
+       SVCXPRT *xprt;
 
        if (rc->rc_client)
                CLNT_DESTROY(rc->rc_client);
+       if (rc->rc_backchannel) {
+               xprt = (SVCXPRT *)rc->rc_backchannel;
+               xprt_unregister(xprt);
+               SVC_RELEASE(xprt);
+       }
        crfree(rc->rc_ucred);
        mtx_destroy(&rc->rc_lock);
        mem_free(rc, sizeof(*rc));

Modified: head/sys/rpc/clnt_vc.c
==============================================================================
--- head/sys/rpc/clnt_vc.c      Sat Dec  8 00:28:16 2012        (r244007)
+++ head/sys/rpc/clnt_vc.c      Sat Dec  8 00:29:16 2012        (r244008)
@@ -67,6 +67,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/protosw.h>
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/sx.h>
 #include <sys/syslog.h>
 #include <sys/time.h>
 #include <sys/uio.h>
@@ -77,8 +78,7 @@ __FBSDID("$FreeBSD$");
 
 #include <rpc/rpc.h>
 #include <rpc/rpc_com.h>
-
-#define MCALL_MSG_SIZE 24
+#include <rpc/krpc.h>
 
 struct cmessage {
         struct cmsghdr cmsg;
@@ -106,43 +106,6 @@ static struct clnt_ops clnt_vc_ops = {
        .cl_control =   clnt_vc_control
 };
 
-/*
- * A pending RPC request which awaits a reply. Requests which have
- * received their reply will have cr_xid set to zero and cr_mrep to
- * the mbuf chain of the reply.
- */
-struct ct_request {
-       TAILQ_ENTRY(ct_request) cr_link;
-       uint32_t                cr_xid;         /* XID of request */
-       struct mbuf             *cr_mrep;       /* reply received by upcall */
-       int                     cr_error;       /* any error from upcall */
-       char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
-};
-
-TAILQ_HEAD(ct_request_list, ct_request);
-
-struct ct_data {
-       struct mtx      ct_lock;
-       int             ct_threads;     /* number of threads in clnt_vc_call */
-       bool_t          ct_closing;     /* TRUE if we are closing */
-       bool_t          ct_closed;      /* TRUE if we are closed */
-       struct socket   *ct_socket;     /* connection socket */
-       bool_t          ct_closeit;     /* close it on destroy */
-       struct timeval  ct_wait;        /* wait interval in milliseconds */
-       struct sockaddr_storage ct_addr; /* remote addr */
-       struct rpc_err  ct_error;
-       uint32_t        ct_xid;
-       char            ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
-       size_t          ct_mpos;        /* pos after marshal */
-       const char      *ct_waitchan;
-       int             ct_waitflag;
-       struct mbuf     *ct_record;     /* current reply record */
-       size_t          ct_record_resid; /* how much left of reply to read */
-       bool_t          ct_record_eor;   /* true if reading last fragment */
-       struct ct_request_list ct_pending;
-       int             ct_upcallrefs;  /* Ref cnt of upcalls in prog. */
-};
-
 static void clnt_vc_upcallsdone(struct ct_data *);
 
 static const char clnt_vc_errstr[] = "%s : %s";
@@ -641,6 +604,7 @@ clnt_vc_control(CLIENT *cl, u_int reques
 {
        struct ct_data *ct = (struct ct_data *)cl->cl_private;
        void *infop = info;
+       SVCXPRT *xprt;
 
        mtx_lock(&ct->ct_lock);
 
@@ -752,6 +716,14 @@ clnt_vc_control(CLIENT *cl, u_int reques
                        *(int *) info = FALSE;
                break;
 
+       case CLSET_BACKCHANNEL:
+               xprt = (SVCXPRT *)info;
+               if (ct->ct_backchannelxprt == NULL) {
+                       xprt->xp_p2 = ct;
+                       ct->ct_backchannelxprt = xprt;
+               }
+               break;
+
        default:
                mtx_unlock(&ct->ct_lock);
                return (FALSE);
@@ -817,10 +789,20 @@ clnt_vc_destroy(CLIENT *cl)
 {
        struct ct_data *ct = (struct ct_data *) cl->cl_private;
        struct socket *so = NULL;
+       SVCXPRT *xprt;
 
        clnt_vc_close(cl);
 
        mtx_lock(&ct->ct_lock);
+       xprt = ct->ct_backchannelxprt;
+       ct->ct_backchannelxprt = NULL;
+       if (xprt != NULL) {
+               mtx_unlock(&ct->ct_lock);       /* To avoid a LOR. */
+               sx_xlock(&xprt->xp_lock);
+               mtx_lock(&ct->ct_lock);
+               xprt->xp_p2 = NULL;
+               xprt_unregister(xprt);
+       }
 
        if (ct->ct_socket) {
                if (ct->ct_closeit) {
@@ -829,6 +811,10 @@ clnt_vc_destroy(CLIENT *cl)
        }
 
        mtx_unlock(&ct->ct_lock);
+       if (xprt != NULL) {
+               sx_xunlock(&xprt->xp_lock);
+               SVC_RELEASE(xprt);
+       }
 
        mtx_destroy(&ct->ct_lock);
        if (so) {
@@ -859,12 +845,15 @@ clnt_vc_soupcall(struct socket *so, void
 {
        struct ct_data *ct = (struct ct_data *) arg;
        struct uio uio;
-       struct mbuf *m;
+       struct mbuf *m, *m2;
        struct ct_request *cr;
        int error, rcvflag, foundreq;
-       uint32_t xid, header;
+       uint32_t xid_plus_direction[2], header;
        bool_t do_read;
+       SVCXPRT *xprt;
+       struct cf_conn *cd;
 
+       CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
        ct->ct_upcallrefs++;
        uio.uio_td = curthread;
        do {
@@ -978,45 +967,89 @@ clnt_vc_soupcall(struct socket *so, void
                            && ct->ct_record_eor) {
                                /*
                                 * The XID is in the first uint32_t of
-                                * the reply.
+                                * the reply and the message direction
+                                * is the second one.
                                 */
-                               if (ct->ct_record->m_len < sizeof(xid) &&
+                               if (ct->ct_record->m_len <
+                                   sizeof(xid_plus_direction) &&
                                    m_length(ct->ct_record, NULL) <
-                                   sizeof(xid)) {
+                                   sizeof(xid_plus_direction)) {
                                        m_freem(ct->ct_record);
                                        break;
                                }
-                               m_copydata(ct->ct_record, 0, sizeof(xid),
-                                   (char *)&xid);
-                               xid = ntohl(xid);
-
-                               mtx_lock(&ct->ct_lock);
-                               foundreq = 0;
-                               TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
-                                       if (cr->cr_xid == xid) {
+                               m_copydata(ct->ct_record, 0,
+                                   sizeof(xid_plus_direction),
+                                   (char *)xid_plus_direction);
+                               xid_plus_direction[0] =
+                                   ntohl(xid_plus_direction[0]);
+                               xid_plus_direction[1] =
+                                   ntohl(xid_plus_direction[1]);
+                               /* Check message direction. */
+                               if (xid_plus_direction[1] == CALL) {
+                                       /* This is a backchannel request. */
+                                       mtx_lock(&ct->ct_lock);
+                                       xprt = ct->ct_backchannelxprt;
+                                       if (xprt == NULL) {
+                                               mtx_unlock(&ct->ct_lock);
+                                               /* Just throw it away. */
+                                               m_freem(ct->ct_record);
+                                               ct->ct_record = NULL;
+                                       } else {
+                                               cd = (struct cf_conn *)
+                                                   xprt->xp_p1;
+                                               m2 = cd->mreq;
                                                /*
-                                                * This one
-                                                * matches. We leave
-                                                * the reply mbuf in
-                                                * cr->cr_mrep. Set
-                                                * the XID to zero so
-                                                * that we will ignore
-                                                * any duplicaed
-                                                * replies.
+                                                * The requests are chained
+                                                * in the m_nextpkt list.
                                                 */
-                                               cr->cr_xid = 0;
-                                               cr->cr_mrep = ct->ct_record;
-                                               cr->cr_error = 0;
-                                               foundreq = 1;
-                                               wakeup(cr);
-                                               break;
+                                               while (m2 != NULL &&
+                                                   m2->m_nextpkt != NULL)
+                                                       /* Find end of list. */
+                                                       m2 = m2->m_nextpkt;
+                                               if (m2 != NULL)
+                                                       m2->m_nextpkt =
+                                                           ct->ct_record;
+                                               else
+                                                       cd->mreq =
+                                                           ct->ct_record;
+                                               ct->ct_record->m_nextpkt =
+                                                   NULL;
+                                               ct->ct_record = NULL;
+                                               xprt_active(xprt);
+                                               mtx_unlock(&ct->ct_lock);
                                        }
-                               }
-                               mtx_unlock(&ct->ct_lock);
+                               } else {
+                                       mtx_lock(&ct->ct_lock);
+                                       foundreq = 0;
+                                       TAILQ_FOREACH(cr, &ct->ct_pending,
+                                           cr_link) {
+                                               if (cr->cr_xid ==
+                                                   xid_plus_direction[0]) {
+                                                       /*
+                                                        * This one
+                                                        * matches. We leave
+                                                        * the reply mbuf in
+                                                        * cr->cr_mrep. Set
+                                                        * the XID to zero so
+                                                        * that we will ignore
+                                                        * any duplicated
+                                                        * replies.
+                                                        */
+                                                       cr->cr_xid = 0;
+                                                       cr->cr_mrep =
+                                                           ct->ct_record;
+                                                       cr->cr_error = 0;
+                                                       foundreq = 1;
+                                                       wakeup(cr);
+                                                       break;
+                                               }
+                                       }
+                                       mtx_unlock(&ct->ct_lock);
 
-                               if (!foundreq)
-                                       m_freem(ct->ct_record);
-                               ct->ct_record = NULL;
+                                       if (!foundreq)
+                                               m_freem(ct->ct_record);
+                                       ct->ct_record = NULL;
+                               }
                        }
                }
        } while (m);

Added: head/sys/rpc/krpc.h
==============================================================================
--- /dev/null   00:00:00 1970   (empty, because file is newly added)
+++ head/sys/rpc/krpc.h Sat Dec  8 00:29:16 2012        (r244008)
@@ -0,0 +1,111 @@
+/*-
+ * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
+ * unrestricted use provided that this legend is included on all tape
+ * media and as a part of the software program in whole or part.  Users
+ * may copy or modify Sun RPC without charge, but are not authorized
+ * to license or distribute it to anyone else except as part of a product or
+ * program developed by the user.
+ * 
+ * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
+ * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
+ * 
+ * Sun RPC is provided with no support and without any obligation on the
+ * part of Sun Microsystems, Inc. to assist in its use, correction,
+ * modification or enhancement.
+ * 
+ * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
+ * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
+ * OR ANY PART THEREOF.
+ * 
+ * In no event will Sun Microsystems, Inc. be liable for any lost revenue
+ * or profits or other special, indirect and consequential damages, even if
+ * Sun has been advised of the possibility of such damages.
+ * 
+ * Sun Microsystems, Inc.
+ * 2550 Garcia Avenue
+ * Mountain View, California  94043
+ *
+ * $FreeBSD$
+ */
+
+#ifndef _RPC_KRPC_H_
+#define        _RPC_KRPC_H_
+
+#ifdef _KERNEL
+/*
+ * Definitions now shared between client and server RPC for backchannels.
+ */
+#define MCALL_MSG_SIZE 24
+
+/*
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
+ */
+struct ct_request {
+       TAILQ_ENTRY(ct_request) cr_link;
+       uint32_t                cr_xid;         /* XID of request */
+       struct mbuf             *cr_mrep;       /* reply received by upcall */
+       int                     cr_error;       /* any error from upcall */
+       char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
+};
+
+TAILQ_HEAD(ct_request_list, ct_request);
+
+struct rc_data {
+       struct mtx              rc_lock;
+       struct sockaddr_storage rc_addr; /* server address */
+       struct netconfig*       rc_nconf; /* network type */
+       rpcprog_t               rc_prog;  /* program number */
+       rpcvers_t               rc_vers;  /* version number */
+       size_t                  rc_sendsz;
+       size_t                  rc_recvsz;
+       struct timeval          rc_timeout;
+       struct timeval          rc_retry;
+       int                     rc_retries;
+       int                     rc_privport;
+       char                    *rc_waitchan;
+       int                     rc_intr;
+       int                     rc_connecting;
+       int                     rc_closed;
+       struct ucred            *rc_ucred;
+       CLIENT*                 rc_client; /* underlying RPC client */
+       struct rpc_err          rc_err;
+       void                    *rc_backchannel;
+};
+
+struct ct_data {
+       struct mtx      ct_lock;
+       int             ct_threads;     /* number of threads in clnt_vc_call */
+       bool_t          ct_closing;     /* TRUE if we are closing */
+       bool_t          ct_closed;      /* TRUE if we are closed */
+       struct socket   *ct_socket;     /* connection socket */
+       bool_t          ct_closeit;     /* close it on destroy */
+       struct timeval  ct_wait;        /* wait interval in milliseconds */
+       struct sockaddr_storage ct_addr; /* remote addr */
+       struct rpc_err  ct_error;
+       uint32_t        ct_xid;
+       char            ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
+       size_t          ct_mpos;        /* pos after marshal */
+       const char      *ct_waitchan;
+       int             ct_waitflag;
+       struct mbuf     *ct_record;     /* current reply record */
+       size_t          ct_record_resid; /* how much left of reply to read */
+       bool_t          ct_record_eor;   /* true if reading last fragment */
+       struct ct_request_list ct_pending;
+       int             ct_upcallrefs;  /* Ref cnt of upcalls in prog. */
+       SVCXPRT         *ct_backchannelxprt; /* xprt for backchannel */
+};
+
+struct cf_conn {  /* kept in xprt->xp_p1 for actual connection */
+       enum xprt_stat strm_stat;
+       struct mbuf *mpending;  /* unparsed data read from the socket */
+       struct mbuf *mreq;      /* current record being built from mpending */
+       uint32_t resid;         /* number of bytes needed for fragment */
+       bool_t eor;             /* reading last fragment of current record */
+};
+
+#endif /* _KERNEL */
+
+#endif /* _RPC_KRPC_H_ */

Modified: head/sys/rpc/svc.h
==============================================================================
--- head/sys/rpc/svc.h  Sat Dec  8 00:28:16 2012        (r244007)
+++ head/sys/rpc/svc.h  Sat Dec  8 00:29:16 2012        (r244008)
@@ -703,6 +703,8 @@ extern SVCXPRT *svc_vc_create(SVCPOOL *,
          * const size_t recvsize;                        -- max recv size
          */
 
+extern SVCXPRT *svc_vc_create_backchannel(SVCPOOL *);
+
 /*
  * Generic TLI create routine
  */

Modified: head/sys/rpc/svc_vc.c
==============================================================================
--- head/sys/rpc/svc_vc.c       Sat Dec  8 00:28:16 2012        (r244007)
+++ head/sys/rpc/svc_vc.c       Sat Dec  8 00:29:16 2012        (r244008)
@@ -65,6 +65,7 @@ __FBSDID("$FreeBSD$");
 
 #include <rpc/rpc.h>
 
+#include <rpc/krpc.h>
 #include <rpc/rpc_com.h>
 
 #include <security/mac/mac_framework.h>
@@ -83,6 +84,14 @@ static bool_t svc_vc_reply(SVCXPRT *, st
 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
     void *in);
+static void svc_vc_backchannel_destroy(SVCXPRT *);
+static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
+static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
+    struct sockaddr **, struct mbuf **);
+static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
+    struct sockaddr *, struct mbuf *);
+static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
+    void *in);
 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
     struct sockaddr *raddr);
 static int svc_vc_accept(struct socket *head, struct socket **sop);
@@ -105,12 +114,12 @@ static struct xp_ops svc_vc_ops = {
        .xp_control =   svc_vc_control
 };
 
-struct cf_conn {  /* kept in xprt->xp_p1 for actual connection */
-       enum xprt_stat strm_stat;
-       struct mbuf *mpending;  /* unparsed data read from the socket */
-       struct mbuf *mreq;      /* current record being built from mpending */
-       uint32_t resid;         /* number of bytes needed for fragment */
-       bool_t eor;             /* reading last fragment of current record */
+static struct xp_ops svc_vc_backchannel_ops = {
+       .xp_recv =      svc_vc_backchannel_recv,
+       .xp_stat =      svc_vc_backchannel_stat,
+       .xp_reply =     svc_vc_backchannel_reply,
+       .xp_destroy =   svc_vc_backchannel_destroy,
+       .xp_control =   svc_vc_backchannel_control
 };
 
 /*
@@ -267,6 +276,28 @@ cleanup_svc_vc_create:
 }
 
 /*
+ * Create a new transport for a backchannel on a clnt_vc socket.
+ */
+SVCXPRT *
+svc_vc_create_backchannel(SVCPOOL *pool)
+{
+       SVCXPRT *xprt = NULL;
+       struct cf_conn *cd = NULL;
+
+       cd = mem_alloc(sizeof(*cd));
+       cd->strm_stat = XPRT_IDLE;
+
+       xprt = svc_xprt_alloc();
+       sx_init(&xprt->xp_lock, "xprt->xp_lock");
+       xprt->xp_pool = pool;
+       xprt->xp_socket = NULL;
+       xprt->xp_p1 = cd;
+       xprt->xp_p2 = NULL;
+       xprt->xp_ops = &svc_vc_backchannel_ops;
+       return (xprt);
+}
+
+/*
  * This does all of the accept except the final call to soaccept. The
  * caller will call soaccept after dropping its locks (soaccept may
  * call malloc).
@@ -452,6 +483,22 @@ svc_vc_destroy(SVCXPRT *xprt)
        mem_free(cd, sizeof(*cd));
 }
 
+static void
+svc_vc_backchannel_destroy(SVCXPRT *xprt)
+{
+       struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
+       struct mbuf *m, *m2;
+
+       svc_xprt_free(xprt);
+       m = cd->mreq;
+       while (m != NULL) {
+               m2 = m;
+               m = m->m_nextpkt;
+               m_freem(m2);
+       }
+       mem_free(cd, sizeof(*cd));
+}
+
 /*ARGSUSED*/
 static bool_t
 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
@@ -466,6 +513,13 @@ svc_vc_rendezvous_control(SVCXPRT *xprt,
        return (FALSE);
 }
 
+static bool_t
+svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
+{
+
+       return (FALSE);
+}
+
 static enum xprt_stat
 svc_vc_stat(SVCXPRT *xprt)
 {
@@ -506,6 +560,19 @@ svc_vc_stat(SVCXPRT *xprt)
        return (XPRT_IDLE);
 }
 
+static enum xprt_stat
+svc_vc_backchannel_stat(SVCXPRT *xprt)
+{
+       struct cf_conn *cd;
+
+       cd = (struct cf_conn *)(xprt->xp_p1);
+
+       if (cd->mreq != NULL)
+               return (XPRT_MOREREQS);
+
+       return (XPRT_IDLE);
+}
+
 static bool_t
 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
     struct sockaddr **addrp, struct mbuf **mp)
@@ -680,6 +747,44 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
 }
 
 static bool_t
+svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
+    struct sockaddr **addrp, struct mbuf **mp)
+{
+       struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
+       struct ct_data *ct;
+       struct mbuf *m;
+       XDR xdrs;
+
+       sx_xlock(&xprt->xp_lock);
+       ct = (struct ct_data *)xprt->xp_p2;
+       if (ct == NULL) {
+               sx_xunlock(&xprt->xp_lock);
+               return (FALSE);
+       }
+       mtx_lock(&ct->ct_lock);
+       m = cd->mreq;
+       if (m == NULL) {
+               xprt_inactive(xprt);
+               mtx_unlock(&ct->ct_lock);
+               sx_xunlock(&xprt->xp_lock);
+               return (FALSE);
+       }
+       cd->mreq = m->m_nextpkt;
+       mtx_unlock(&ct->ct_lock);
+       sx_xunlock(&xprt->xp_lock);
+
+       xdrmbuf_create(&xdrs, m, XDR_DECODE);
+       if (! xdr_callmsg(&xdrs, msg)) {
+               XDR_DESTROY(&xdrs);
+               return (FALSE);
+       }
+       *addrp = NULL;
+       *mp = xdrmbuf_getall(&xdrs);
+       XDR_DESTROY(&xdrs);
+       return (TRUE);
+}
+
+static bool_t
 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
     struct sockaddr *addr, struct mbuf *m)
 {
@@ -733,6 +838,65 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_m
 }
 
 static bool_t
+svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
+    struct sockaddr *addr, struct mbuf *m)
+{
+       struct ct_data *ct;
+       XDR xdrs;
+       struct mbuf *mrep;
+       bool_t stat = TRUE;
+       int error;
+
+       /*
+        * Leave space for record mark.
+        */
+       MGETHDR(mrep, M_WAITOK, MT_DATA);
+       mrep->m_len = 0;
+       mrep->m_data += sizeof(uint32_t);
+
+       xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
+
+       if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
+           msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
+               if (!xdr_replymsg(&xdrs, msg))
+                       stat = FALSE;
+               else
+                       xdrmbuf_append(&xdrs, m);
+       } else {
+               stat = xdr_replymsg(&xdrs, msg);
+       }
+
+       if (stat) {
+               m_fixhdr(mrep);
+
+               /*
+                * Prepend a record marker containing the reply length.
+                */
+               M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
+               *mtod(mrep, uint32_t *) =
+                       htonl(0x80000000 | (mrep->m_pkthdr.len
+                               - sizeof(uint32_t)));
+               sx_xlock(&xprt->xp_lock);
+               ct = (struct ct_data *)xprt->xp_p2;
+               if (ct != NULL)
+                       error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
+                           0, curthread);
+               else
+                       error = EPIPE;
+               sx_xunlock(&xprt->xp_lock);
+               if (!error) {
+                       stat = TRUE;
+               }
+       } else {
+               m_freem(mrep);
+       }
+
+       XDR_DESTROY(&xdrs);
+
+       return (stat);
+}
+
+static bool_t
 svc_vc_null()
 {
 
_______________________________________________
svn-src-all@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-all
To unsubscribe, send any mail to "svn-src-all-unsubscr...@freebsd.org"

Reply via email to