Network asynchronous sending support is implemented in a zero-copy way
similar to how ->sendpage() works.
Here is benchmark of system with 10 concurent sockets which are used for
sending. Receiving server and synchronous sender use epoll() to select active 
socket.
Benchmark results:
[EMAIL PROTECTED] event]# ./naio_send -a 192.168.0.48 -p 1234 -f /dev/kevent 
-N10 -T1 -F /dev/null
Using sync test with 10 clients.
Written 90 Mb, time 1.005239 sec, speed 89.767987 Mb/sec, CPU usage user:   0, 
kernel:  24.
Written 179 Mb, time 2.009292 sec, speed 89.430210 Mb/sec, CPU usage user:   0, 
kernel:  11.
Written 269 Mb, time 3.013318 sec, speed 89.377961 Mb/sec, CPU usage user:   0, 
kernel:  20.
Written 359 Mb, time 4.017380 sec, speed 89.405479 Mb/sec, CPU usage user:   0, 
kernel:  18.
Written 449 Mb, time 5.021395 sec, speed 89.522402 Mb/sec, CPU usage user:   0, 
kernel:  20.
Written 538 Mb, time 6.025448 sec, speed 89.417625 Mb/sec, CPU usage user:   1, 
kernel:  19.
Written 628 Mb, time 7.029475 sec, speed 89.424797 Mb/sec, CPU usage user:   0, 
kernel:  19.
Written 717 Mb, time 8.033543 sec, speed 89.365050 Mb/sec, CPU usage user:   1, 
kernel:  19.
Written 795 Mb, time 9.037552 sec, speed 88.004767 Mb/sec, CPU usage user:   0, 
kernel:  20.
Written 871 Mb, time 10.041601 sec, speed 86.819292 Mb/sec, CPU usage user:   
1, kernel:  20.
Written 953 Mb, time 11.045655 sec, speed 86.341926 Mb/sec, CPU usage user:   
1, kernel:  20.
Written 1043 Mb, time 12.049671 sec, speed 86.567457 Mb/sec, CPU usage user:   
1, kernel:  20.
Written 1132 Mb, time 13.053734 sec, speed 86.786411 Mb/sec, CPU usage user:   
0, kernel:  21.
Written 1216 Mb, time 14.057747 sec, speed 86.541472 Mb/sec, CPU usage user:   
1, kernel:  21.
Written 1245 Mb, time 14.433538 sec, speed 86.260144 Mb/sec, CPU usage user:   
1, kernel:  21.

[EMAIL PROTECTED] event]# ./naio_send -a 192.168.0.48 -p 1234 -f /dev/kevent 
-N10 -T0 -F /dev/null
Using async test with 10 clients.
Written 81 Mb, time 1.005705 sec, speed 81.181391 Mb/sec, CPU usage user:   0, 
kernel:   5.
Written 171 Mb, time 2.009730 sec, speed 85.169634 Mb/sec, CPU usage user:   0, 
kernel:   4.
Written 260 Mb, time 3.013776 sec, speed 86.346984 Mb/sec, CPU usage user:   0, 
kernel:   5.
Written 349 Mb, time 4.017804 sec, speed 86.915872 Mb/sec, CPU usage user:   0, 
kernel:   6.
Written 438 Mb, time 5.021849 sec, speed 87.218871 Mb/sec, CPU usage user:   0, 
kernel:   5.
Written 527 Mb, time 6.025884 sec, speed 87.556526 Mb/sec, CPU usage user:   0, 
kernel:   5.
Written 617 Mb, time 7.029937 sec, speed 87.805285 Mb/sec, CPU usage user:   0, 
kernel:   6.
Written 706 Mb, time 8.033961 sec, speed 87.918766 Mb/sec, CPU usage user:   0, 
kernel:   6.
Written 795 Mb, time 9.038002 sec, speed 88.015944 Mb/sec, CPU usage user:   0, 
kernel:   6.
Written 884 Mb, time 10.042040 sec, speed 88.119001 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 974 Mb, time 11.046080 sec, speed 88.178199 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1063 Mb, time 12.050118 sec, speed 88.261260 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1148 Mb, time 13.054159 sec, speed 87.956583 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1224 Mb, time 14.058228 sec, speed 87.097292 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1313 Mb, time 15.062235 sec, speed 87.200186 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1402 Mb, time 16.066306 sec, speed 87.307377 Mb/sec, CPU usage user:   
0, kernel:   6.
Written 1447 Mb, time 16.580100 sec, speed 87.278716 Mb/sec, CPU usage user:   
0, kernel:   6.

Picture attached.

Patches, userspace utilities, design and implementation notes can be
found on project's homepage:

http://tservice.net.ru/~s0mbre/old/?section=projects&item=naio


Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]>

diff --git a/include/linux/kevent.h b/include/linux/kevent.h
index 8cf83dc..b219633 100644
--- a/include/linux/kevent.h
+++ b/include/linux/kevent.h
@@ -65,6 +65,7 @@ enum {
 enum {
        KEVENT_SOCKET_RECV      = 0x1,
        KEVENT_SOCKET_ACCEPT    = 0x2,
+       KEVENT_SOCKET_SEND      = 0x4,
 };
 
 /*
@@ -243,13 +244,21 @@ static inline void kevent_inode_remove(s
 }
 #endif /* CONFIG_KEVENT_INODE */
 #ifdef CONFIG_KEVENT_SOCKET
+
+enum {
+       KEVENT_SOCKET_FLAGS_ASYNC = 0,
+       KEVENT_SOCKET_FLAGS_INUSE,
+};
+
 void kevent_socket_notify(struct sock *sock, u32 event);
 int kevent_socket_dequeue(struct kevent *k);
 int kevent_socket_enqueue(struct kevent *k);
+#define sock_async(__sk)       test_bit(KEVENT_SOCKET_FLAGS_ASYNC, 
&__sk->sk_kevent_flags)
 #else
 static inline void kevent_socket_notify(struct sock *sock, u32 event)
 {
 }
+#define sock_async(__sk)       0
 #endif
 #endif /* __KERNEL__ */
 #endif /* __KEVENT_H */
diff --git a/include/net/sock.h b/include/net/sock.h
index c086188..d571c3f 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -213,7 +213,9 @@ struct sock {
        int                     sk_route_caps;
        unsigned long           sk_flags;
        unsigned long           sk_lingertime;
-       unsigned long           sk_async_sock;
+#ifdef CONFIG_KEVENT_SOCKET
+       unsigned long           sk_kevent_flags;
+#endif
        /*
         * The backlog queue is special, it is always used with
         * the per-socket spinlock held and requires low latency
@@ -553,6 +555,9 @@ struct proto {
        
        int                     (*async_recv) (struct sock *sk, 
                                                void *dst, size_t size);
+       int                     (*async_send) (struct sock *sk, 
+                                               struct page **pages, unsigned 
int poffset, 
+                                               size_t size);
 
        /* Keeping track of sk's, looking them up, and port selection methods. 
*/
        void                    (*hash)(struct sock *sk);
diff --git a/include/net/tcp.h b/include/net/tcp.h
index e14e48d..8788625 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -393,6 +393,7 @@ extern int                  tcp_setsockopt(struct sock 
                                               int optlen);
 extern void                    tcp_set_keepalive(struct sock *sk, int val);
 extern int                     tcp_async_recv(struct sock *sk, void *dst, 
size_t size);
+extern int                     tcp_async_send(struct sock *sk, struct page 
**pages, unsigned int poffset, size_t size);
 extern int                     tcp_recvmsg(struct kiocb *iocb, struct sock *sk,
                                            struct msghdr *msg,
                                            size_t len, int nonblock, 
diff --git a/kernel/kevent/kevent.c b/kernel/kevent/kevent.c
index ef08aa7..5c3a141 100644
--- a/kernel/kevent/kevent.c
+++ b/kernel/kevent/kevent.c
@@ -222,7 +222,7 @@ void kevent_requeue(struct kevent *k)
  */
 void kevent_storage_ready(struct kevent_storage *st, kevent_callback_t 
ready_callback, u32 event)
 {
-       unsigned long flags;
+       //unsigned long flags;
        struct kevent *k, *n;
        unsigned int qlen;
        u32 ev = 0;
diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c
index 477cbb1..cee289f 100644
--- a/kernel/kevent/kevent_naio.c
+++ b/kernel/kevent/kevent_naio.c
@@ -47,7 +47,7 @@ static int kevent_naio_enqueue(struct ke
        unsigned int size = k->event.id.raw[1];
        int num = size/PAGE_SIZE + 1;
        struct file *file;
-       struct sock *sk;
+       struct sock *sk = NULL;
        int fput_needed;
 
        file = fget_light(k->event.id.raw[0], &fput_needed);
@@ -61,16 +61,18 @@ static int kevent_naio_enqueue(struct ke
        sk = SOCKET_I(file->f_dentry->d_inode)->sk;
 
        err = -ESOCKTNOSUPPORT;
-       if (!sk || !sk->sk_prot->async_recv)
+       if (!sk || !sk->sk_prot->async_recv || !sk->sk_prot->async_send)
                goto err_out_fput;
-       
+
        page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL);
        if (!page)
                return -ENOMEM;
 
        addr = k->event.ptr;
 
+       down_read(&current->mm->mmap_sem);
        err = get_user_pages(current, current->mm, (unsigned long)addr, num, 1, 
0, page, NULL);
+       up_read(&current->mm->mmap_sem);
        if (err <= 0)
                goto err_out_free;
        num = err;
@@ -80,12 +82,10 @@ static int kevent_naio_enqueue(struct ke
        k->priv = page;
 
        err = kevent_socket_enqueue(k);
-       if (err < 0)
+       if (err)
                goto err_out_put_pages;
 
-       err = kevent_naio_callback(k);
-       if (err < 0)
-               goto err_out_put_pages;
+       fput_light(file, fput_needed);
 
        return err;
 
@@ -125,40 +125,47 @@ static int kevent_naio_callback(struct k
        unsigned int size = k->event.id.raw[1];
        unsigned int off = k->event.ret_data[1];
        struct page **pages = k->priv, *page;
-       int ready = 0, num = 0, err = 0;
+       int ready = 0, num = off/PAGE_SIZE, err = 0, send = 0;
        void *ptr, *optr;
        unsigned int len;
 
-       while (size) {
-               num = off / PAGE_SIZE;
+       if (!test_bit(KEVENT_SOCKET_FLAGS_ASYNC, &sk->sk_kevent_flags))
+               return -1;
 
+       if (k->event.event & KEVENT_SOCKET_SEND)
+               send = 1;
+       else if (!(k->event.event & KEVENT_SOCKET_RECV))
+               return -EINVAL;
+
+       /*
+        * sk_prot->async_*() can return either number of bytes processed,
+        * or negative error value, or zero if socket is closed.
+        */
+
+       if (!send) {
                page = pages[num];
 
                optr = ptr = kmap_atomic(page, KM_IRQ0);
-               if (!ptr) {
-                       err = -ENOMEM;
-                       break;
-               }
+               if (!ptr)
+                       return -ENOMEM;
 
                ptr += off % PAGE_SIZE;
                len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size);
 
-               /*
-                * sk_prot->async_recv() can return either number of bytes read,
-                * or negative error value, or zero if socket is closed.
-                */
                err = sk->sk_prot->async_recv(sk, ptr, len);
 
                kunmap_atomic(optr, KM_IRQ0);
+       } else {
+               len = size;
+               //bh_lock_sock(sk);
+               err = sk->sk_prot->async_send(sk, pages, off, size);
+               //bh_unlock_sock(sk);
+       }
 
-               if (err > 0) {
-                       num++;
-                       size -= err;
-                       off += err;
-               }
-
-               if (err != len)
-                       break;
+       if (err > 0) {
+               num++;
+               size -= err;
+               off += err;
        }
 
        k->event.ret_data[1] = off;
@@ -169,6 +176,10 @@ static int kevent_naio_callback(struct k
 
        if (!size)
                ready = 1;
+#if 0
+       printk("%s: sk=%p, k=%p, size=%4u, off=%4u, err=%3d, ready=%1d.\n",
+                       __func__, sk, k, size, off, err, ready);
+#endif
 
        return ready;
 }
diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c
index fd74f3b..d179ca8 100644
--- a/kernel/kevent/kevent_socket.c
+++ b/kernel/kevent/kevent_socket.c
@@ -81,11 +81,15 @@ int kevent_socket_enqueue(struct kevent 
        if (err < 0)
                goto err_out_iput;
 
-       err = kevent_socket_callback(k);
+       err = k->callback(k);
+       if (err)
+               goto err_out_dequeue;
 
        fput_light(file, fput_needed);
        return err;
 
+err_out_dequeue:
+       kevent_storage_dequeue(k->st, k);
 err_out_iput:
        iput(inode);
 err_out_fput:
@@ -113,6 +117,8 @@ int kevent_init_socket(struct kevent *k)
 
 void kevent_socket_notify(struct sock *sk, u32 event)
 {
-       if (sk->sk_socket)
+       if (sk->sk_socket && !test_and_set_bit(KEVENT_SOCKET_FLAGS_INUSE, 
&sk->sk_kevent_flags)) {
                kevent_storage_ready(&SOCK_INODE(sk->sk_socket)->st, NULL, 
event);
+               clear_bit(KEVENT_SOCKET_FLAGS_INUSE, &sk->sk_kevent_flags);
+       }
 }
diff --git a/kernel/kevent/kevent_user.c b/kernel/kevent/kevent_user.c
index ae776c5..e197786 100644
--- a/kernel/kevent/kevent_user.c
+++ b/kernel/kevent/kevent_user.c
@@ -331,6 +331,10 @@ static int kevent_user_ctl_add(struct ke
        orig = (void __user *)arg;
        ctl_addr = (void __user *)(arg - sizeof(struct kevent_user_control));
 
+       err = -ENFILE;
+       if (u->kevent_num + ctl->num >= 1000)
+               goto err_out_remove;
+
        for (i=0; i<ctl->num; ++i) {
                k = kevent_alloc(GFP_KERNEL);
                if (!k) {
@@ -359,18 +363,18 @@ static int kevent_user_ctl_add(struct ke
 
                err = kevent_enqueue(k);
                if (err) {
+                       if (err < 0)
+                               k->event.ret_flags |= KEVENT_RET_BROKEN;
+                       k->event.ret_flags |= KEVENT_RET_DONE;
                        if (copy_to_user(orig, &k->event, sizeof(struct 
ukevent)))
                                cerr = -EINVAL;
 
                        orig += sizeof(struct ukevent);
 
-                       if (err < 0)
-                               kevent_free(k);
                        num++;
                        k->event.ret_flags = 0;
-               }  
-               
-               if (err >= 0) {
+                       kevent_free(k);
+               } else {
                        unsigned int hash = kevent_user_hash(&k->event);
                        struct kevent_list *l = &u->kqueue[hash];
                        
diff --git a/net/core/sock.c b/net/core/sock.c
index eed500b..3d32c5c 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -455,11 +455,16 @@ set_rcvbuf:
                        spin_unlock_bh(&sk->sk_lock.slock);
                        ret = -ENONET;
                        break;
+#ifdef CONFIG_KEVENT_SOCKET
                case SO_ASYNC_SOCK:
                        spin_lock_bh(&sk->sk_lock.slock);
-                       sk->sk_async_sock = valbool;
+                       if (valbool)
+                               set_bit(KEVENT_SOCKET_FLAGS_ASYNC, 
&sk->sk_kevent_flags);
+                       else
+                               clear_bit(KEVENT_SOCKET_FLAGS_ASYNC, 
&sk->sk_kevent_flags);
                        spin_unlock_bh(&sk->sk_lock.slock);
                        break;
+#endif
 
                /* We implement the SO_SNDLOWAT etc to
                   not be settable (1003.1g 5.3) */
@@ -1244,7 +1249,7 @@ static void sock_def_write_space(struct 
                if (sock_writeable(sk))
                        sk_wake_async(sk, 2, POLL_OUT);
        
-               kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
+               kevent_socket_notify(sk, KEVENT_SOCKET_SEND);
        }
 
        read_unlock(&sk->sk_callback_lock);
diff --git a/net/core/stream.c b/net/core/stream.c
index 67ab414..745a07f 100644
--- a/net/core/stream.c
+++ b/net/core/stream.c
@@ -36,7 +36,7 @@ void sk_stream_write_space(struct sock *
                        wake_up_interruptible(sk->sk_sleep);
                if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
                        sock_wake_async(sock, 2, POLL_OUT);
-               kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
+               kevent_socket_notify(sk, KEVENT_SOCKET_SEND);
        }
 }
 
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index e9129c5..c2ed578 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -1093,6 +1093,121 @@ int tcp_read_sock(struct sock *sk, read_
 /*
  * Must be called with locked sock.
  */
+int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, 
size_t len)
+{
+       struct tcp_sock *tp = tcp_sk(sk);
+       int mss_now, size_goal;
+       int err = -EAGAIN;
+       ssize_t copied;
+
+       /* Wait for a connection to finish. */
+       if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+               goto out_err;
+
+       clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+       mss_now = tcp_current_mss(sk, 1);
+       size_goal = tp->xmit_size_goal;
+       copied = 0;
+
+       err = -EPIPE;
+       if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN) || sock_flag(sk, 
SOCK_DONE) ||
+                       (sk->sk_state == TCP_CLOSE) || 
(atomic_read(&sk->sk_refcnt) == 1))
+               goto do_error;
+
+       while (len > 0) {
+               struct sk_buff *skb = sk->sk_write_queue.prev;
+               struct page *page = pages[poffset / PAGE_SIZE];
+               int copy, i, can_coalesce;
+               int offset = poffset % PAGE_SIZE;
+               int size = min_t(size_t, len, PAGE_SIZE - offset);
+
+               if (!sk->sk_send_head || (copy = size_goal - skb->len) <= 0) {
+new_segment:
+                       if (!sk_stream_memory_free(sk))
+                               goto wait_for_sndbuf;
+
+                       skb = sk_stream_alloc_pskb(sk, 0, 0,
+                                                  sk->sk_allocation);
+                       if (!skb)
+                               goto wait_for_memory;
+
+                       skb_entail(sk, tp, skb);
+                       copy = size_goal;
+               }
+
+               if (copy > size)
+                       copy = size;
+
+               i = skb_shinfo(skb)->nr_frags;
+               can_coalesce = skb_can_coalesce(skb, i, page, offset);
+               if (!can_coalesce && i >= MAX_SKB_FRAGS) {
+                       tcp_mark_push(tp, skb);
+                       goto new_segment;
+               }
+               if (!sk_stream_wmem_schedule(sk, copy))
+                       goto wait_for_memory;
+               
+               if (can_coalesce) {
+                       skb_shinfo(skb)->frags[i - 1].size += copy;
+               } else {
+                       get_page(page);
+                       skb_fill_page_desc(skb, i, page, offset, copy);
+               }
+
+               skb->len += copy;
+               skb->data_len += copy;
+               skb->truesize += copy;
+               sk->sk_wmem_queued += copy;
+               sk->sk_forward_alloc -= copy;
+               skb->ip_summed = CHECKSUM_HW;
+               tp->write_seq += copy;
+               TCP_SKB_CB(skb)->end_seq += copy;
+               skb_shinfo(skb)->tso_segs = 0;
+
+               if (!copied)
+                       TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+
+               copied += copy;
+               poffset += copy;
+               if (!(len -= copy))
+                       goto out;
+
+               if (skb->len < mss_now)
+                       continue;
+
+               if (forced_push(tp)) {
+                       tcp_mark_push(tp, skb);
+                       __tcp_push_pending_frames(sk, tp, mss_now, 
TCP_NAGLE_PUSH);
+               } else if (skb == sk->sk_send_head)
+                       tcp_push_one(sk, mss_now);
+               continue;
+
+wait_for_sndbuf:
+               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+wait_for_memory:
+               if (copied)
+                       tcp_push(sk, tp, 0, mss_now, TCP_NAGLE_PUSH);
+
+               err = -EAGAIN;
+               goto do_error;
+       }
+
+out:
+       if (copied)
+               tcp_push(sk, tp, 0, mss_now, tp->nonagle);
+       return copied;
+
+do_error:
+       if (copied)
+               goto out;
+out_err:
+       return sk_stream_error(sk, 0, err);
+}
+
+/*
+ * Must be called with locked sock.
+ */
 int tcp_async_recv(struct sock *sk, void *dst, size_t len)
 {
        struct tcp_sock *tp = tcp_sk(sk);
@@ -2292,6 +2407,7 @@ EXPORT_SYMBOL(tcp_ioctl);
 EXPORT_SYMBOL(tcp_poll);
 EXPORT_SYMBOL(tcp_read_sock);
 EXPORT_SYMBOL(tcp_async_recv);
+EXPORT_SYMBOL(tcp_async_send);
 EXPORT_SYMBOL(tcp_recvmsg);
 EXPORT_SYMBOL(tcp_sendmsg);
 EXPORT_SYMBOL(tcp_sendpage);
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index 8107f6e..6d652fa 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -3820,7 +3820,7 @@ int tcp_rcv_established(struct sock *sk,
                        if (tp->ucopy.task == current &&
                            tp->copied_seq == tp->rcv_nxt &&
                            len - tcp_header_len <= tp->ucopy.len &&
-                           sock_owned_by_user(sk) && !sk->sk_async_sock) {
+                           sock_owned_by_user(sk) && !sock_async(sk)) {
                                __set_current_state(TASK_RUNNING);
 
                                if (!tcp_copy_to_iovec(sk, skb, 
tcp_header_len)) {
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index 17ad358..f010be2 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -1248,7 +1248,7 @@ process:
 
        bh_lock_sock(sk);
        ret = 0;
-       if (sk->sk_async_sock) {
+       if (sock_async(sk)) {
                local_bh_disable();
                ret = tcp_v4_do_rcv(sk, skb);
                local_bh_enable();
@@ -1984,6 +1984,7 @@ struct proto tcp_prot = {
        .sendmsg                = tcp_sendmsg,
        .recvmsg                = tcp_recvmsg,
        .async_recv             = tcp_async_recv,
+       .async_send             = tcp_async_send,
        .backlog_rcv            = tcp_v4_do_rcv,
        .hash                   = tcp_v4_hash,
        .unhash                 = tcp_unhash,



-- 
        Evgeniy Polyakov

Attachment: send_naio_vs_sync.png
Description: PNG image

Reply via email to