On 09/01/2024 21:38, Alexey Kuznetsov wrote:
Observation is that af_unix sockets today became slower
and eat a lot of more cpu than 100G ethernet. So, implement
MSG_ZEROCOPY over af_unix sockets to be able to talk to
local services without collapse of performance.
Unexpectedly, this makes sense! F.e. zerocopy cannot be done
in TCP over loopback, because skbs when passing over loopback
change ownership. But unix sockets traditionally implemented
in the way that skbs queued on destination sockets still preserve
ownership of source socket, so that completion notifications
can be delivered back to sender without problems.
Code looks neat and clean and might be submitted to mainstream
one day. A little bit of dirt is that it shares IP_RECVERR
control message, but rather this dirt is not mine, but
by original author, who used protocol level message for generic
socket level notifications without any good reasons, creating
a mess for users.
Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com>
---
net/core/datagram.c | 2 +-
net/core/sock.c | 3 ++
net/unix/af_unix.c | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
3 files changed, 98 insertions(+), 15 deletions(-)
diff --git a/net/core/datagram.c b/net/core/datagram.c
index 8eb5999..980248b 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -640,7 +640,7 @@ int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff
*skb,
skb->data_len += copied;
skb->len += copied;
skb->truesize += truesize;
- if (sk && sk->sk_type == SOCK_STREAM) {
+ if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family !=
PF_UNIX) {
sk_wmem_queued_add(sk, truesize);
sk_mem_charge(sk, truesize);
} else {
diff --git a/net/core/sock.c b/net/core/sock.c
index 5d7e16a..fe8e102 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1405,6 +1405,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
(sk->sk_type == SOCK_DGRAM &&
sk->sk_protocol == IPPROTO_UDP)))
ret = -EOPNOTSUPP;
+ } else if (sk->sk_family == PF_UNIX) {
+ if (sk->sk_type == SOCK_DGRAM)
+ ret = -EOPNOTSUPP;
Likely we don't want to enable zerocopy for SOCK_SEQPACKET. Maybe change
to: "if (sk->sk_type != SOCK_STREAM)"
} else if (sk->sk_family != PF_RDS) {
ret = -EOPNOTSUPP;
}
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 36c3cb9..e32fe47 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -115,6 +115,7 @@
#include <linux/freezer.h>
#include <linux/file.h>
#include <linux/btf_ids.h>
+#include <linux/errqueue.h>
#include "scm.h"
@@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
{
struct sock *sk = sock->sk;
struct sock *other = NULL;
+ struct ubuf_info *uarg = NULL;
int err, size;
struct sk_buff *skb;
int sent = 0;
@@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock,
struct msghdr *msg,
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto pipe_err;
+ if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
+ uarg = msg_zerocopy_alloc(sk, len);
+ if (!uarg) {
+ err = -ENOBUFS;
+ goto out_err;
+ }
+ }
+
while (sent < len) {
size = len - sent;
- /* Keep two messages in the pipe so it schedules better */
- size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+ if (!uarg) {
+ /* Keep two messages in the pipe so it schedules better
*/
+ size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+
+ /* allow fallback to order-0 allocations */
+ size = min_t(int, size, SKB_MAX_HEAD(0) +
UNIX_SKB_FRAGS_SZ);
- /* allow fallback to order-0 allocations */
- size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
+ data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
- data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
+ data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
- data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
+ skb = sock_alloc_send_pskb(sk, size - data_len,
data_len,
+ msg->msg_flags & MSG_DONTWAIT,
&err,
+
get_order(UNIX_SKB_FRAGS_SZ));
+ } else {
+ size = min_t(int, size, sk->sk_sndbuf);
+ skb = sock_alloc_send_pskb(sk, 0, 0,
+ msg->msg_flags & MSG_DONTWAIT,
&err, 0);
+ }
- skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
- msg->msg_flags & MSG_DONTWAIT, &err,
- get_order(UNIX_SKB_FRAGS_SZ));
if (!skb)
goto out_err;
@@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
}
fds_sent = true;
- skb_put(skb, size - data_len);
- skb->data_len = data_len;
- skb->len = size;
- err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+ if (!uarg) {
+ skb_put(skb, size - data_len);
+ skb->data_len = data_len;
+ skb->len = size;
+ err = skb_copy_datagram_from_iter(skb, 0,
&msg->msg_iter, size);
+ } else {
+ err = skb_zerocopy_iter_stream(sk, skb, msg, size,
uarg);
+ if (err > 0) {
+ size = err;
+ err = 0;
+ }
+ }
if (err) {
kfree_skb(skb);
goto out_err;
@@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock,
struct msghdr *msg,
sent += size;
}
+ net_zcopy_put(uarg);
scm_destroy(&scm);
return sent;
@@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock,
struct msghdr *msg,
send_sig(SIGPIPE, current, 0);
err = -EPIPE;
out_err:
+ net_zcopy_put_abort(uarg, true);
scm_destroy(&scm);
return sent ? : err;
}
@@ -2725,6 +2752,46 @@ static int unix_stream_read_generic(struct
unix_stream_read_state *state,
return copied ? : err;
}
+static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
+{
+ struct sock_exterr_skb *serr;
+ struct sk_buff *skb;
+ struct {
+ struct sock_extended_err ee;
+ } errhdr;
+ int err;
+ int copied;
+
+ err = -EAGAIN;
+ skb = sock_dequeue_err_skb(sk);
+ if (!skb)
+ goto out;
+
+ copied = skb->len;
+ if (copied > len) {
+ msg->msg_flags |= MSG_TRUNC;
+ copied = len;
+ }
+ err = skb_copy_datagram_msg(skb, 0, msg, copied);
+ if (unlikely(err)) {
+ kfree_skb(skb);
+ return err;
+ }
+
+ serr = SKB_EXT_ERR(skb);
+
+ memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
+
+ put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
+
+ msg->msg_flags |= MSG_ERRQUEUE;
+ err = copied;
+
+ consume_skb(skb);
+out:
+ return err;
+}
+
static int unix_stream_read_actor(struct sk_buff *skb,
int skip, int chunk,
struct unix_stream_read_state *state)
@@ -2769,6 +2836,10 @@ static int unix_stream_recvmsg(struct socket *sock,
struct msghdr *msg,
return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, NULL);
#endif
+
+ if (unlikely(flags & MSG_ERRQUEUE))
+ return unix_recv_error(sk, msg, size);
In case of MSG_PEEK it might be not what user wants to receive zero-copy
notification error message.
+
return unix_stream_read_generic(&state, true);
}
@@ -2776,6 +2847,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
int skip, int chunk,
struct unix_stream_read_state *state)
{
+ /* Zerocopy pages cannot be spliced, alas. It looks like splice
interface
+ * gives no way to notify about actual page consumption. So, we have to
copy.
+ * This path is not going be legit, sender will be notified and will
stop zerocopying.
+ */
+ int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
+
I prefer not to put function call on the same line with variable
declaration, especially when this function requires error checking. This
way codding style can both confirm to a) having newline between
variables and other code and b) checking function return on the next
line without extra newline in between.
+ if (err)
+ return err;
+
return skb_splice_bits(skb, state->socket->sk,
UNIXCB(skb).consumed + skip,
state->pipe, chunk, state->splice_flags);
@@ -2962,7 +3042,7 @@ static __poll_t unix_poll(struct file *file, struct
socket *sock, poll_table *wa
mask = 0;
/* exceptional events? */
- if (sk->sk_err)
+ if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
mask |= EPOLLERR;
if (sk->sk_shutdown == SHUTDOWN_MASK)
mask |= EPOLLHUP;
--
Best regards, Tikhomirov Pavel
Senior Software Developer, Virtuozzo.
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel