Network asynchronous sending support is implemented in a zero-copy way similar to how ->sendpage() works. Here is benchmark of system with 10 concurent sockets which are used for sending. Receiving server and synchronous sender use epoll() to select active socket. Benchmark results: [EMAIL PROTECTED] event]# ./naio_send -a 192.168.0.48 -p 1234 -f /dev/kevent -N10 -T1 -F /dev/null Using sync test with 10 clients. Written 90 Mb, time 1.005239 sec, speed 89.767987 Mb/sec, CPU usage user: 0, kernel: 24. Written 179 Mb, time 2.009292 sec, speed 89.430210 Mb/sec, CPU usage user: 0, kernel: 11. Written 269 Mb, time 3.013318 sec, speed 89.377961 Mb/sec, CPU usage user: 0, kernel: 20. Written 359 Mb, time 4.017380 sec, speed 89.405479 Mb/sec, CPU usage user: 0, kernel: 18. Written 449 Mb, time 5.021395 sec, speed 89.522402 Mb/sec, CPU usage user: 0, kernel: 20. Written 538 Mb, time 6.025448 sec, speed 89.417625 Mb/sec, CPU usage user: 1, kernel: 19. Written 628 Mb, time 7.029475 sec, speed 89.424797 Mb/sec, CPU usage user: 0, kernel: 19. Written 717 Mb, time 8.033543 sec, speed 89.365050 Mb/sec, CPU usage user: 1, kernel: 19. Written 795 Mb, time 9.037552 sec, speed 88.004767 Mb/sec, CPU usage user: 0, kernel: 20. Written 871 Mb, time 10.041601 sec, speed 86.819292 Mb/sec, CPU usage user: 1, kernel: 20. Written 953 Mb, time 11.045655 sec, speed 86.341926 Mb/sec, CPU usage user: 1, kernel: 20. Written 1043 Mb, time 12.049671 sec, speed 86.567457 Mb/sec, CPU usage user: 1, kernel: 20. Written 1132 Mb, time 13.053734 sec, speed 86.786411 Mb/sec, CPU usage user: 0, kernel: 21. Written 1216 Mb, time 14.057747 sec, speed 86.541472 Mb/sec, CPU usage user: 1, kernel: 21. Written 1245 Mb, time 14.433538 sec, speed 86.260144 Mb/sec, CPU usage user: 1, kernel: 21.
[EMAIL PROTECTED] event]# ./naio_send -a 192.168.0.48 -p 1234 -f /dev/kevent -N10 -T0 -F /dev/null Using async test with 10 clients. Written 81 Mb, time 1.005705 sec, speed 81.181391 Mb/sec, CPU usage user: 0, kernel: 5. Written 171 Mb, time 2.009730 sec, speed 85.169634 Mb/sec, CPU usage user: 0, kernel: 4. Written 260 Mb, time 3.013776 sec, speed 86.346984 Mb/sec, CPU usage user: 0, kernel: 5. Written 349 Mb, time 4.017804 sec, speed 86.915872 Mb/sec, CPU usage user: 0, kernel: 6. Written 438 Mb, time 5.021849 sec, speed 87.218871 Mb/sec, CPU usage user: 0, kernel: 5. Written 527 Mb, time 6.025884 sec, speed 87.556526 Mb/sec, CPU usage user: 0, kernel: 5. Written 617 Mb, time 7.029937 sec, speed 87.805285 Mb/sec, CPU usage user: 0, kernel: 6. Written 706 Mb, time 8.033961 sec, speed 87.918766 Mb/sec, CPU usage user: 0, kernel: 6. Written 795 Mb, time 9.038002 sec, speed 88.015944 Mb/sec, CPU usage user: 0, kernel: 6. Written 884 Mb, time 10.042040 sec, speed 88.119001 Mb/sec, CPU usage user: 0, kernel: 6. Written 974 Mb, time 11.046080 sec, speed 88.178199 Mb/sec, CPU usage user: 0, kernel: 6. Written 1063 Mb, time 12.050118 sec, speed 88.261260 Mb/sec, CPU usage user: 0, kernel: 6. Written 1148 Mb, time 13.054159 sec, speed 87.956583 Mb/sec, CPU usage user: 0, kernel: 6. Written 1224 Mb, time 14.058228 sec, speed 87.097292 Mb/sec, CPU usage user: 0, kernel: 6. Written 1313 Mb, time 15.062235 sec, speed 87.200186 Mb/sec, CPU usage user: 0, kernel: 6. Written 1402 Mb, time 16.066306 sec, speed 87.307377 Mb/sec, CPU usage user: 0, kernel: 6. Written 1447 Mb, time 16.580100 sec, speed 87.278716 Mb/sec, CPU usage user: 0, kernel: 6. Picture attached. Patches, userspace utilities, design and implementation notes can be found on project's homepage: http://tservice.net.ru/~s0mbre/old/?section=projects&item=naio Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]> diff --git a/include/linux/kevent.h b/include/linux/kevent.h index 8cf83dc..b219633 100644 --- a/include/linux/kevent.h +++ b/include/linux/kevent.h @@ -65,6 +65,7 @@ enum { enum { KEVENT_SOCKET_RECV = 0x1, KEVENT_SOCKET_ACCEPT = 0x2, + KEVENT_SOCKET_SEND = 0x4, }; /* @@ -243,13 +244,21 @@ static inline void kevent_inode_remove(s } #endif /* CONFIG_KEVENT_INODE */ #ifdef CONFIG_KEVENT_SOCKET + +enum { + KEVENT_SOCKET_FLAGS_ASYNC = 0, + KEVENT_SOCKET_FLAGS_INUSE, +}; + void kevent_socket_notify(struct sock *sock, u32 event); int kevent_socket_dequeue(struct kevent *k); int kevent_socket_enqueue(struct kevent *k); +#define sock_async(__sk) test_bit(KEVENT_SOCKET_FLAGS_ASYNC, &__sk->sk_kevent_flags) #else static inline void kevent_socket_notify(struct sock *sock, u32 event) { } +#define sock_async(__sk) 0 #endif #endif /* __KERNEL__ */ #endif /* __KEVENT_H */ diff --git a/include/net/sock.h b/include/net/sock.h index c086188..d571c3f 100644 --- a/include/net/sock.h +++ b/include/net/sock.h @@ -213,7 +213,9 @@ struct sock { int sk_route_caps; unsigned long sk_flags; unsigned long sk_lingertime; - unsigned long sk_async_sock; +#ifdef CONFIG_KEVENT_SOCKET + unsigned long sk_kevent_flags; +#endif /* * The backlog queue is special, it is always used with * the per-socket spinlock held and requires low latency @@ -553,6 +555,9 @@ struct proto { int (*async_recv) (struct sock *sk, void *dst, size_t size); + int (*async_send) (struct sock *sk, + struct page **pages, unsigned int poffset, + size_t size); /* Keeping track of sk's, looking them up, and port selection methods. */ void (*hash)(struct sock *sk); diff --git a/include/net/tcp.h b/include/net/tcp.h index e14e48d..8788625 100644 --- a/include/net/tcp.h +++ b/include/net/tcp.h @@ -393,6 +393,7 @@ extern int tcp_setsockopt(struct sock int optlen); extern void tcp_set_keepalive(struct sock *sk, int val); extern int tcp_async_recv(struct sock *sk, void *dst, size_t size); +extern int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, size_t size); extern int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t len, int nonblock, diff --git a/kernel/kevent/kevent.c b/kernel/kevent/kevent.c index ef08aa7..5c3a141 100644 --- a/kernel/kevent/kevent.c +++ b/kernel/kevent/kevent.c @@ -222,7 +222,7 @@ void kevent_requeue(struct kevent *k) */ void kevent_storage_ready(struct kevent_storage *st, kevent_callback_t ready_callback, u32 event) { - unsigned long flags; + //unsigned long flags; struct kevent *k, *n; unsigned int qlen; u32 ev = 0; diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c index 477cbb1..cee289f 100644 --- a/kernel/kevent/kevent_naio.c +++ b/kernel/kevent/kevent_naio.c @@ -47,7 +47,7 @@ static int kevent_naio_enqueue(struct ke unsigned int size = k->event.id.raw[1]; int num = size/PAGE_SIZE + 1; struct file *file; - struct sock *sk; + struct sock *sk = NULL; int fput_needed; file = fget_light(k->event.id.raw[0], &fput_needed); @@ -61,16 +61,18 @@ static int kevent_naio_enqueue(struct ke sk = SOCKET_I(file->f_dentry->d_inode)->sk; err = -ESOCKTNOSUPPORT; - if (!sk || !sk->sk_prot->async_recv) + if (!sk || !sk->sk_prot->async_recv || !sk->sk_prot->async_send) goto err_out_fput; - + page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL); if (!page) return -ENOMEM; addr = k->event.ptr; + down_read(¤t->mm->mmap_sem); err = get_user_pages(current, current->mm, (unsigned long)addr, num, 1, 0, page, NULL); + up_read(¤t->mm->mmap_sem); if (err <= 0) goto err_out_free; num = err; @@ -80,12 +82,10 @@ static int kevent_naio_enqueue(struct ke k->priv = page; err = kevent_socket_enqueue(k); - if (err < 0) + if (err) goto err_out_put_pages; - err = kevent_naio_callback(k); - if (err < 0) - goto err_out_put_pages; + fput_light(file, fput_needed); return err; @@ -125,40 +125,47 @@ static int kevent_naio_callback(struct k unsigned int size = k->event.id.raw[1]; unsigned int off = k->event.ret_data[1]; struct page **pages = k->priv, *page; - int ready = 0, num = 0, err = 0; + int ready = 0, num = off/PAGE_SIZE, err = 0, send = 0; void *ptr, *optr; unsigned int len; - while (size) { - num = off / PAGE_SIZE; + if (!test_bit(KEVENT_SOCKET_FLAGS_ASYNC, &sk->sk_kevent_flags)) + return -1; + if (k->event.event & KEVENT_SOCKET_SEND) + send = 1; + else if (!(k->event.event & KEVENT_SOCKET_RECV)) + return -EINVAL; + + /* + * sk_prot->async_*() can return either number of bytes processed, + * or negative error value, or zero if socket is closed. + */ + + if (!send) { page = pages[num]; optr = ptr = kmap_atomic(page, KM_IRQ0); - if (!ptr) { - err = -ENOMEM; - break; - } + if (!ptr) + return -ENOMEM; ptr += off % PAGE_SIZE; len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size); - /* - * sk_prot->async_recv() can return either number of bytes read, - * or negative error value, or zero if socket is closed. - */ err = sk->sk_prot->async_recv(sk, ptr, len); kunmap_atomic(optr, KM_IRQ0); + } else { + len = size; + //bh_lock_sock(sk); + err = sk->sk_prot->async_send(sk, pages, off, size); + //bh_unlock_sock(sk); + } - if (err > 0) { - num++; - size -= err; - off += err; - } - - if (err != len) - break; + if (err > 0) { + num++; + size -= err; + off += err; } k->event.ret_data[1] = off; @@ -169,6 +176,10 @@ static int kevent_naio_callback(struct k if (!size) ready = 1; +#if 0 + printk("%s: sk=%p, k=%p, size=%4u, off=%4u, err=%3d, ready=%1d.\n", + __func__, sk, k, size, off, err, ready); +#endif return ready; } diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c index fd74f3b..d179ca8 100644 --- a/kernel/kevent/kevent_socket.c +++ b/kernel/kevent/kevent_socket.c @@ -81,11 +81,15 @@ int kevent_socket_enqueue(struct kevent if (err < 0) goto err_out_iput; - err = kevent_socket_callback(k); + err = k->callback(k); + if (err) + goto err_out_dequeue; fput_light(file, fput_needed); return err; +err_out_dequeue: + kevent_storage_dequeue(k->st, k); err_out_iput: iput(inode); err_out_fput: @@ -113,6 +117,8 @@ int kevent_init_socket(struct kevent *k) void kevent_socket_notify(struct sock *sk, u32 event) { - if (sk->sk_socket) + if (sk->sk_socket && !test_and_set_bit(KEVENT_SOCKET_FLAGS_INUSE, &sk->sk_kevent_flags)) { kevent_storage_ready(&SOCK_INODE(sk->sk_socket)->st, NULL, event); + clear_bit(KEVENT_SOCKET_FLAGS_INUSE, &sk->sk_kevent_flags); + } } diff --git a/kernel/kevent/kevent_user.c b/kernel/kevent/kevent_user.c index ae776c5..e197786 100644 --- a/kernel/kevent/kevent_user.c +++ b/kernel/kevent/kevent_user.c @@ -331,6 +331,10 @@ static int kevent_user_ctl_add(struct ke orig = (void __user *)arg; ctl_addr = (void __user *)(arg - sizeof(struct kevent_user_control)); + err = -ENFILE; + if (u->kevent_num + ctl->num >= 1000) + goto err_out_remove; + for (i=0; i<ctl->num; ++i) { k = kevent_alloc(GFP_KERNEL); if (!k) { @@ -359,18 +363,18 @@ static int kevent_user_ctl_add(struct ke err = kevent_enqueue(k); if (err) { + if (err < 0) + k->event.ret_flags |= KEVENT_RET_BROKEN; + k->event.ret_flags |= KEVENT_RET_DONE; if (copy_to_user(orig, &k->event, sizeof(struct ukevent))) cerr = -EINVAL; orig += sizeof(struct ukevent); - if (err < 0) - kevent_free(k); num++; k->event.ret_flags = 0; - } - - if (err >= 0) { + kevent_free(k); + } else { unsigned int hash = kevent_user_hash(&k->event); struct kevent_list *l = &u->kqueue[hash]; diff --git a/net/core/sock.c b/net/core/sock.c index eed500b..3d32c5c 100644 --- a/net/core/sock.c +++ b/net/core/sock.c @@ -455,11 +455,16 @@ set_rcvbuf: spin_unlock_bh(&sk->sk_lock.slock); ret = -ENONET; break; +#ifdef CONFIG_KEVENT_SOCKET case SO_ASYNC_SOCK: spin_lock_bh(&sk->sk_lock.slock); - sk->sk_async_sock = valbool; + if (valbool) + set_bit(KEVENT_SOCKET_FLAGS_ASYNC, &sk->sk_kevent_flags); + else + clear_bit(KEVENT_SOCKET_FLAGS_ASYNC, &sk->sk_kevent_flags); spin_unlock_bh(&sk->sk_lock.slock); break; +#endif /* We implement the SO_SNDLOWAT etc to not be settable (1003.1g 5.3) */ @@ -1244,7 +1249,7 @@ static void sock_def_write_space(struct if (sock_writeable(sk)) sk_wake_async(sk, 2, POLL_OUT); - kevent_socket_notify(sk, KEVENT_SOCKET_RECV); + kevent_socket_notify(sk, KEVENT_SOCKET_SEND); } read_unlock(&sk->sk_callback_lock); diff --git a/net/core/stream.c b/net/core/stream.c index 67ab414..745a07f 100644 --- a/net/core/stream.c +++ b/net/core/stream.c @@ -36,7 +36,7 @@ void sk_stream_write_space(struct sock * wake_up_interruptible(sk->sk_sleep); if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN)) sock_wake_async(sock, 2, POLL_OUT); - kevent_socket_notify(sk, KEVENT_SOCKET_RECV); + kevent_socket_notify(sk, KEVENT_SOCKET_SEND); } } diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c index e9129c5..c2ed578 100644 --- a/net/ipv4/tcp.c +++ b/net/ipv4/tcp.c @@ -1093,6 +1093,121 @@ int tcp_read_sock(struct sock *sk, read_ /* * Must be called with locked sock. */ +int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, size_t len) +{ + struct tcp_sock *tp = tcp_sk(sk); + int mss_now, size_goal; + int err = -EAGAIN; + ssize_t copied; + + /* Wait for a connection to finish. */ + if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) + goto out_err; + + clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); + + mss_now = tcp_current_mss(sk, 1); + size_goal = tp->xmit_size_goal; + copied = 0; + + err = -EPIPE; + if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN) || sock_flag(sk, SOCK_DONE) || + (sk->sk_state == TCP_CLOSE) || (atomic_read(&sk->sk_refcnt) == 1)) + goto do_error; + + while (len > 0) { + struct sk_buff *skb = sk->sk_write_queue.prev; + struct page *page = pages[poffset / PAGE_SIZE]; + int copy, i, can_coalesce; + int offset = poffset % PAGE_SIZE; + int size = min_t(size_t, len, PAGE_SIZE - offset); + + if (!sk->sk_send_head || (copy = size_goal - skb->len) <= 0) { +new_segment: + if (!sk_stream_memory_free(sk)) + goto wait_for_sndbuf; + + skb = sk_stream_alloc_pskb(sk, 0, 0, + sk->sk_allocation); + if (!skb) + goto wait_for_memory; + + skb_entail(sk, tp, skb); + copy = size_goal; + } + + if (copy > size) + copy = size; + + i = skb_shinfo(skb)->nr_frags; + can_coalesce = skb_can_coalesce(skb, i, page, offset); + if (!can_coalesce && i >= MAX_SKB_FRAGS) { + tcp_mark_push(tp, skb); + goto new_segment; + } + if (!sk_stream_wmem_schedule(sk, copy)) + goto wait_for_memory; + + if (can_coalesce) { + skb_shinfo(skb)->frags[i - 1].size += copy; + } else { + get_page(page); + skb_fill_page_desc(skb, i, page, offset, copy); + } + + skb->len += copy; + skb->data_len += copy; + skb->truesize += copy; + sk->sk_wmem_queued += copy; + sk->sk_forward_alloc -= copy; + skb->ip_summed = CHECKSUM_HW; + tp->write_seq += copy; + TCP_SKB_CB(skb)->end_seq += copy; + skb_shinfo(skb)->tso_segs = 0; + + if (!copied) + TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH; + + copied += copy; + poffset += copy; + if (!(len -= copy)) + goto out; + + if (skb->len < mss_now) + continue; + + if (forced_push(tp)) { + tcp_mark_push(tp, skb); + __tcp_push_pending_frames(sk, tp, mss_now, TCP_NAGLE_PUSH); + } else if (skb == sk->sk_send_head) + tcp_push_one(sk, mss_now); + continue; + +wait_for_sndbuf: + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); +wait_for_memory: + if (copied) + tcp_push(sk, tp, 0, mss_now, TCP_NAGLE_PUSH); + + err = -EAGAIN; + goto do_error; + } + +out: + if (copied) + tcp_push(sk, tp, 0, mss_now, tp->nonagle); + return copied; + +do_error: + if (copied) + goto out; +out_err: + return sk_stream_error(sk, 0, err); +} + +/* + * Must be called with locked sock. + */ int tcp_async_recv(struct sock *sk, void *dst, size_t len) { struct tcp_sock *tp = tcp_sk(sk); @@ -2292,6 +2407,7 @@ EXPORT_SYMBOL(tcp_ioctl); EXPORT_SYMBOL(tcp_poll); EXPORT_SYMBOL(tcp_read_sock); EXPORT_SYMBOL(tcp_async_recv); +EXPORT_SYMBOL(tcp_async_send); EXPORT_SYMBOL(tcp_recvmsg); EXPORT_SYMBOL(tcp_sendmsg); EXPORT_SYMBOL(tcp_sendpage); diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c index 8107f6e..6d652fa 100644 --- a/net/ipv4/tcp_input.c +++ b/net/ipv4/tcp_input.c @@ -3820,7 +3820,7 @@ int tcp_rcv_established(struct sock *sk, if (tp->ucopy.task == current && tp->copied_seq == tp->rcv_nxt && len - tcp_header_len <= tp->ucopy.len && - sock_owned_by_user(sk) && !sk->sk_async_sock) { + sock_owned_by_user(sk) && !sock_async(sk)) { __set_current_state(TASK_RUNNING); if (!tcp_copy_to_iovec(sk, skb, tcp_header_len)) { diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c index 17ad358..f010be2 100644 --- a/net/ipv4/tcp_ipv4.c +++ b/net/ipv4/tcp_ipv4.c @@ -1248,7 +1248,7 @@ process: bh_lock_sock(sk); ret = 0; - if (sk->sk_async_sock) { + if (sock_async(sk)) { local_bh_disable(); ret = tcp_v4_do_rcv(sk, skb); local_bh_enable(); @@ -1984,6 +1984,7 @@ struct proto tcp_prot = { .sendmsg = tcp_sendmsg, .recvmsg = tcp_recvmsg, .async_recv = tcp_async_recv, + .async_send = tcp_async_send, .backlog_rcv = tcp_v4_do_rcv, .hash = tcp_v4_hash, .unhash = tcp_unhash, -- Evgeniy Polyakov
send_naio_vs_sync.png
Description: PNG image