Author: jhb
Date: Tue Jun 21 22:19:06 2016
New Revision: 302074
URL: https://svnweb.freebsd.org/changeset/base/302074

Log:
  Account for AIO socket operations in thread/process resource usage.
  
  File and disk-backed I/O requests store counts of read/written disk
  blocks in each AIO job so that they can be charged to the thread that
  completes an AIO request via aio_return() or aio_waitcomplete().  This
  change extends AIO jobs to store counts of received/sent messages and
  updates socket backends to set these counts accordingly.  Note that
  the socket backends are careful to only charge a single messages for
  each AIO request even though a single request on a blocking socket might
  invoke sosend or soreceive multiple times.  This is to mimic the
  resource accounting of synchronous read/write.
  
  Adjust the UNIX socketpair AIO test to verify that the message resource
  usage counts update accordingly for aio_read and aio_write.
  
  Approved by:  re (hrs)
  Sponsored by: Chelsio Communications
  Differential Revision:        https://reviews.freebsd.org/D6911

Modified:
  head/sys/dev/cxgbe/tom/t4_ddp.c
  head/sys/kern/sys_socket.c
  head/sys/kern/vfs_aio.c
  head/sys/sys/aio.h
  head/tests/sys/aio/aio_test.c

Modified: head/sys/dev/cxgbe/tom/t4_ddp.c
==============================================================================
--- head/sys/dev/cxgbe/tom/t4_ddp.c     Tue Jun 21 21:55:03 2016        
(r302073)
+++ head/sys/dev/cxgbe/tom/t4_ddp.c     Tue Jun 21 22:19:06 2016        
(r302074)
@@ -360,6 +360,8 @@ insert_ddp_data(struct toepcb *toep, uin
                placed = n;
                if (placed > job->uaiocb.aio_nbytes - copied)
                        placed = job->uaiocb.aio_nbytes - copied;
+               if (placed > 0)
+                       job->msgrcv = 1;
                if (!aio_clear_cancel_function(job)) {
                        /*
                         * Update the copied length for when
@@ -602,6 +604,7 @@ handle_ddp_data(struct toepcb *toep, __b
        toep->rx_credits += len;
 #endif
 
+       job->msgrcv = 1;
        if (db->cancel_pending) {
                /*
                 * Update the job's length but defer completion to the
@@ -756,6 +759,8 @@ handle_ddp_close(struct toepcb *toep, st
                placed = len;
                if (placed > job->uaiocb.aio_nbytes - copied)
                        placed = job->uaiocb.aio_nbytes - copied;
+               if (placed > 0)
+                       job->msgrcv = 1;
                if (!aio_clear_cancel_function(job)) {
                        /*
                         * Update the copied length for when
@@ -1458,6 +1463,7 @@ sbcopy:
        if (copied != 0) {
                sbdrop_locked(sb, copied);
                job->aio_received += copied;
+               job->msgrcv = 1;
                copied = job->aio_received;
                inp = sotoinpcb(so);
                if (!INP_TRY_WLOCK(inp)) {

Modified: head/sys/kern/sys_socket.c
==============================================================================
--- head/sys/kern/sys_socket.c  Tue Jun 21 21:55:03 2016        (r302073)
+++ head/sys/kern/sys_socket.c  Tue Jun 21 22:19:06 2016        (r302074)
@@ -563,6 +563,7 @@ soaio_process_job(struct socket *so, str
        struct uio uio;
        struct iovec iov;
        size_t cnt, done;
+       long ru_before;
        int error, flags;
 
        SOCKBUF_UNLOCK(sb);
@@ -585,23 +586,33 @@ retry:
        uio.uio_td = td;
        flags = MSG_NBIO;
 
-       /* TODO: Charge ru_msg* to job. */
+       /*
+        * For resource usage accounting, only count a completed request
+        * as a single message to avoid counting multiple calls to
+        * sosend/soreceive on a blocking socket.
+        */
 
        if (sb == &so->so_rcv) {
                uio.uio_rw = UIO_READ;
+               ru_before = td->td_ru.ru_msgrcv;
 #ifdef MAC
                error = mac_socket_check_receive(fp->f_cred, so);
                if (error == 0)
 
 #endif
                        error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
+               if (td->td_ru.ru_msgrcv != ru_before)
+                       job->msgrcv = 1;
        } else {
                uio.uio_rw = UIO_WRITE;
+               ru_before = td->td_ru.ru_msgsnd;
 #ifdef MAC
                error = mac_socket_check_send(fp->f_cred, so);
                if (error == 0)
 #endif
                        error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
+               if (td->td_ru.ru_msgsnd != ru_before)
+                       job->msgsnd = 1;
                if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
                        PROC_LOCK(job->userproc);
                        kern_psignal(job->userproc, SIGPIPE);

Modified: head/sys/kern/vfs_aio.c
==============================================================================
--- head/sys/kern/vfs_aio.c     Tue Jun 21 21:55:03 2016        (r302073)
+++ head/sys/kern/vfs_aio.c     Tue Jun 21 22:19:06 2016        (r302074)
@@ -743,9 +743,11 @@ aio_process_rw(struct kaiocb *job)
        struct uio auio;
        struct iovec aiov;
        ssize_t cnt;
+       long msgsnd_st, msgsnd_end;
+       long msgrcv_st, msgrcv_end;
+       long oublock_st, oublock_end;
+       long inblock_st, inblock_end;
        int error;
-       int oublock_st, oublock_end;
-       int inblock_st, inblock_end;
 
        KASSERT(job->uaiocb.aio_lio_opcode == LIO_READ ||
            job->uaiocb.aio_lio_opcode == LIO_WRITE,
@@ -769,8 +771,11 @@ aio_process_rw(struct kaiocb *job)
        auio.uio_segflg = UIO_USERSPACE;
        auio.uio_td = td;
 
+       msgrcv_st = td->td_ru.ru_msgrcv;
+       msgsnd_st = td->td_ru.ru_msgsnd;
        inblock_st = td->td_ru.ru_inblock;
        oublock_st = td->td_ru.ru_oublock;
+
        /*
         * aio_aqueue() acquires a reference to the file that is
         * released in aio_free_entry().
@@ -787,11 +792,15 @@ aio_process_rw(struct kaiocb *job)
                auio.uio_rw = UIO_WRITE;
                error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
        }
+       msgrcv_end = td->td_ru.ru_msgrcv;
+       msgsnd_end = td->td_ru.ru_msgsnd;
        inblock_end = td->td_ru.ru_inblock;
        oublock_end = td->td_ru.ru_oublock;
 
-       job->inputcharge = inblock_end - inblock_st;
-       job->outputcharge = oublock_end - oublock_st;
+       job->msgrcv = msgrcv_end - msgrcv_st;
+       job->msgsnd = msgsnd_end - msgsnd_st;
+       job->inblock = inblock_end - inblock_st;
+       job->outblock = oublock_end - oublock_st;
 
        if ((error) && (auio.uio_resid != cnt)) {
                if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
@@ -1805,13 +1814,10 @@ kern_aio_return(struct thread *td, struc
                status = job->uaiocb._aiocb_private.status;
                error = job->uaiocb._aiocb_private.error;
                td->td_retval[0] = status;
-               if (job->uaiocb.aio_lio_opcode == LIO_WRITE) {
-                       td->td_ru.ru_oublock += job->outputcharge;
-                       job->outputcharge = 0;
-               } else if (job->uaiocb.aio_lio_opcode == LIO_READ) {
-                       td->td_ru.ru_inblock += job->inputcharge;
-                       job->inputcharge = 0;
-               }
+               td->td_ru.ru_oublock += job->outblock;
+               td->td_ru.ru_inblock += job->inblock;
+               td->td_ru.ru_msgsnd += job->msgsnd;
+               td->td_ru.ru_msgrcv += job->msgrcv;
                aio_free_entry(job);
                AIO_UNLOCK(ki);
                ops->store_error(ujob, error);
@@ -2327,9 +2333,9 @@ aio_physwakeup(struct bio *bp)
                error = bp->bio_error;
        nblks = btodb(nbytes);
        if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
-               job->outputcharge += nblks;
+               job->outblock += nblks;
        else
-               job->inputcharge += nblks;
+               job->inblock += nblks;
 
        if (error)
                aio_complete(job, -1, error);
@@ -2395,13 +2401,10 @@ kern_aio_waitcomplete(struct thread *td,
                status = job->uaiocb._aiocb_private.status;
                error = job->uaiocb._aiocb_private.error;
                td->td_retval[0] = status;
-               if (job->uaiocb.aio_lio_opcode == LIO_WRITE) {
-                       td->td_ru.ru_oublock += job->outputcharge;
-                       job->outputcharge = 0;
-               } else if (job->uaiocb.aio_lio_opcode == LIO_READ) {
-                       td->td_ru.ru_inblock += job->inputcharge;
-                       job->inputcharge = 0;
-               }
+               td->td_ru.ru_oublock += job->outblock;
+               td->td_ru.ru_inblock += job->inblock;
+               td->td_ru.ru_msgsnd += job->msgsnd;
+               td->td_ru.ru_msgrcv += job->msgrcv;
                aio_free_entry(job);
                AIO_UNLOCK(ki);
                ops->store_aiocb(ujobp, ujob);

Modified: head/sys/sys/aio.h
==============================================================================
--- head/sys/sys/aio.h  Tue Jun 21 21:55:03 2016        (r302073)
+++ head/sys/sys/aio.h  Tue Jun 21 22:19:06 2016        (r302074)
@@ -119,8 +119,10 @@ struct kaiocb {
        TAILQ_ENTRY(kaiocb) plist;      /* (a) lists of pending / done jobs */
        TAILQ_ENTRY(kaiocb) allist;     /* (a) list of all jobs in proc */
        int     jobflags;               /* (a) job flags */
-       int     inputcharge;            /* (*) input blocks */
-       int     outputcharge;           /* (*) output blocks */
+       int     inblock;                /* (*) input blocks */
+       int     outblock;               /* (*) output blocks */
+       int     msgsnd;                 /* (*) messages sent */
+       int     msgrcv;                 /* (*) messages received */
        struct  proc *userproc;         /* (*) user process */
        struct  ucred *cred;            /* (*) active credential when created */
        struct  file *fd_file;          /* (*) pointer to file structure */

Modified: head/tests/sys/aio/aio_test.c
==============================================================================
--- head/tests/sys/aio/aio_test.c       Tue Jun 21 21:55:03 2016        
(r302073)
+++ head/tests/sys/aio/aio_test.c       Tue Jun 21 22:19:06 2016        
(r302074)
@@ -40,6 +40,7 @@
 
 #include <sys/param.h>
 #include <sys/module.h>
+#include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <sys/mdioctl.h>
@@ -455,6 +456,7 @@ ATF_TC_BODY(aio_unix_socketpair_test, tc
 {
        struct aio_unix_socketpair_arg arg;
        struct aio_context ac;
+       struct rusage ru_before, ru_after;
        int sockets[2];
 
        ATF_REQUIRE_KERNEL_MODULE("aio");
@@ -467,8 +469,17 @@ ATF_TC_BODY(aio_unix_socketpair_test, tc
        aio_context_init(&ac, sockets[0],
            sockets[1], UNIX_SOCKETPAIR_LEN, UNIX_SOCKETPAIR_TIMEOUT,
            aio_unix_socketpair_cleanup, &arg);
+       ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_before) != -1,
+           "getrusage failed: %s", strerror(errno));
        aio_write_test(&ac);
+       ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1,
+           "getrusage failed: %s", strerror(errno));
+       ATF_REQUIRE(ru_after.ru_msgsnd == ru_before.ru_msgsnd + 1);
+       ru_before = ru_after;
        aio_read_test(&ac);
+       ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1,
+           "getrusage failed: %s", strerror(errno));
+       ATF_REQUIRE(ru_after.ru_msgrcv == ru_before.ru_msgrcv + 1);
 
        aio_unix_socketpair_cleanup(&arg);
 }
_______________________________________________
svn-src-head@freebsd.org mailing list
https://lists.freebsd.org/mailman/listinfo/svn-src-head
To unsubscribe, send any mail to "svn-src-head-unsubscr...@freebsd.org"

Reply via email to