Refactor the read operation with no functional changes. tcp_bpf has two read paths: strparser and non-strparser. Currently the differences are implemented directly in their respective recvmsg functions, which works fine. However, upcoming splice support would require duplicating the same logic for both paths. To avoid this, extract the strparser-specific differences into an independent abstraction that can be reused by splice.
For ingress_msg data processing, introduce a function pointer callback approach. The current implementation passes sk_msg_recvmsg_actor(), which performs copy_page_to_iter() - the same copy logic previously embedded in sk_msg_recvmsg(). This provides the extension point for future splice support, where a different actor can be plugged in. Signed-off-by: Jiayuan Chen <[email protected]> --- include/linux/skmsg.h | 12 ++++- net/core/skmsg.c | 24 +++++++-- net/ipv4/tcp_bpf.c | 123 ++++++++++++++++++++++++++++-------------- 3 files changed, 112 insertions(+), 47 deletions(-) diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h index 19f4f253b4f9..d53756914b2f 100644 --- a/include/linux/skmsg.h +++ b/include/linux/skmsg.h @@ -143,8 +143,16 @@ int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from, struct sk_msg *msg, u32 bytes); int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, int len, int flags); -int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, - int len, int flags, int *copied_from_self); +typedef int (*sk_msg_read_actor_t)(void *arg, struct page *page, + unsigned int offset, size_t len); +/* Core function for reading ingress_msg, dispatches to the given actor */ +int sk_msg_read_core(struct sock *sk, struct sk_psock *psock, + size_t len, int flags, + sk_msg_read_actor_t actor, void *actor_arg, + int *copied_from_self); +int sk_msg_recvmsg_actor(void *arg, struct page *page, + unsigned int offset, size_t len); + bool sk_msg_is_readable(struct sock *sk); static inline void sk_msg_check_to_free(struct sk_msg *msg, u32 i, u32 bytes) diff --git a/net/core/skmsg.c b/net/core/skmsg.c index 2e26174c9919..6a906bfe3aa4 100644 --- a/net/core/skmsg.c +++ b/net/core/skmsg.c @@ -409,10 +409,12 @@ int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from, } EXPORT_SYMBOL_GPL(sk_msg_memcopy_from_iter); -int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, - int len, int flags, int *copied_from_self) +/* Core function for reading ingress_msg, dispatches to the given actor */ +int sk_msg_read_core(struct sock *sk, struct sk_psock *psock, + size_t len, int flags, + sk_msg_read_actor_t actor, void *actor_arg, + int *copied_from_self) { - struct iov_iter *iter = &msg->msg_iter; int peek = flags & MSG_PEEK; struct sk_msg *msg_rx; int i, copied = 0; @@ -440,7 +442,8 @@ int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg if (copied + copy > len) copy = len - copied; if (copy) - copy = copy_page_to_iter(page, sge->offset, copy, iter); + copy = actor(actor_arg, page, + sge->offset, copy); if (!copy) { copied = copied ? copied : -EFAULT; goto out; @@ -495,12 +498,23 @@ int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg out: return copied; } +EXPORT_SYMBOL_GPL(sk_msg_read_core); + +int sk_msg_recvmsg_actor(void *arg, struct page *page, + unsigned int offset, size_t len) +{ + struct msghdr *msg = arg; + + return copy_page_to_iter(page, offset, len, &msg->msg_iter); +} +EXPORT_SYMBOL_GPL(sk_msg_recvmsg_actor); /* Receive sk_msg from psock->ingress_msg to @msg. */ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, int len, int flags) { - return __sk_msg_recvmsg(sk, psock, msg, len, flags, NULL); + return sk_msg_read_core(sk, psock, len, flags, + sk_msg_recvmsg_actor, msg, NULL); } EXPORT_SYMBOL_GPL(sk_msg_recvmsg); diff --git a/net/ipv4/tcp_bpf.c b/net/ipv4/tcp_bpf.c index c449a044895e..606c2b079f86 100644 --- a/net/ipv4/tcp_bpf.c +++ b/net/ipv4/tcp_bpf.c @@ -218,31 +218,26 @@ static bool is_next_msg_fin(struct sk_psock *psock) return false; } -static int tcp_bpf_recvmsg_parser(struct sock *sk, - struct msghdr *msg, - size_t len, - int flags, - int *addr_len) +/* + * __tcp_bpf_recvmsg_parser - inner recvmsg for strparser path + * + * Handles TCP seq tracking, pre-accept receive_queue draining, FIN detection, + * and receive window updates. The actual data read is delegated to @actor. + * + * Caller must hold a psock ref. Socket lock is acquired/released internally. + * Returns bytes read, or negative error. + */ +static int __tcp_bpf_recvmsg_parser(struct sock *sk, struct sk_psock *psock, + sk_msg_read_actor_t actor, void *actor_arg, + size_t len, int flags) { int peek = flags & MSG_PEEK; - struct sk_psock *psock; - struct tcp_sock *tcp; + struct tcp_sock *tcp = tcp_sk(sk); int copied_from_self = 0; int copied = 0; u32 seq; - if (unlikely(flags & MSG_ERRQUEUE)) - return inet_recv_error(sk, msg, len, addr_len); - - if (!len) - return 0; - - psock = sk_psock_get(sk); - if (unlikely(!psock)) - return tcp_recvmsg(sk, msg, len, flags, addr_len); - lock_sock(sk); - tcp = tcp_sk(sk); seq = tcp->copied_seq; /* We may have received data on the sk_receive_queue pre-accept and * then we can not use read_skb in this context because we haven't @@ -264,7 +259,8 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, } msg_bytes_ready: - copied = __sk_msg_recvmsg(sk, psock, msg, len, flags, &copied_from_self); + copied = sk_msg_read_core(sk, psock, len, flags, + actor, actor_arg, &copied_from_self); /* The typical case for EFAULT is the socket was gracefully * shutdown with a FIN pkt. So check here the other case is * some error on copy_page_to_iter which would be unexpected. @@ -329,10 +325,34 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, unlock: release_sock(sk); - sk_psock_put(sk, psock); return copied; } +static int tcp_bpf_recvmsg_parser(struct sock *sk, + struct msghdr *msg, + size_t len, + int flags, + int *addr_len) +{ + struct sk_psock *psock; + int ret; + + if (unlikely(flags & MSG_ERRQUEUE)) + return inet_recv_error(sk, msg, len, addr_len); + + if (!len) + return 0; + + psock = sk_psock_get(sk); + if (unlikely(!psock)) + return tcp_recvmsg(sk, msg, len, flags, addr_len); + + ret = __tcp_bpf_recvmsg_parser(sk, psock, + sk_msg_recvmsg_actor, msg, len, flags); + sk_psock_put(sk, psock); + return ret; +} + static int tcp_bpf_ioctl(struct sock *sk, int cmd, int *karg) { bool slow; @@ -351,29 +371,25 @@ static int tcp_bpf_ioctl(struct sock *sk, int cmd, int *karg) return 0; } -static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, - int flags, int *addr_len) +/* + * __tcp_bpf_recvmsg - inner recvmsg for non-parser (verdict only) path + * + * No TCP seq tracking needed (tcp_eat_skb handles it at verdict time). + * Returns bytes read, 0 if caller should fall back to the normal TCP + * read path (data on receive_queue but not in psock), or negative error. + * + * Caller must hold a psock ref. Socket lock is acquired/released internally. + */ +static int __tcp_bpf_recvmsg(struct sock *sk, struct sk_psock *psock, + sk_msg_read_actor_t actor, void *actor_arg, + size_t len, int flags) { - struct sk_psock *psock; int copied, ret; - if (unlikely(flags & MSG_ERRQUEUE)) - return inet_recv_error(sk, msg, len, addr_len); - - if (!len) - return 0; - - psock = sk_psock_get(sk); - if (unlikely(!psock)) - return tcp_recvmsg(sk, msg, len, flags, addr_len); - if (!skb_queue_empty(&sk->sk_receive_queue) && - sk_psock_queue_empty(psock)) { - sk_psock_put(sk, psock); - return tcp_recvmsg(sk, msg, len, flags, addr_len); - } lock_sock(sk); msg_bytes_ready: - copied = sk_msg_recvmsg(sk, psock, msg, len, flags); + copied = sk_msg_read_core(sk, psock, len, flags, + actor, actor_arg, NULL); if (!copied) { long timeo; int data; @@ -388,8 +404,7 @@ static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (!sk_psock_queue_empty(psock)) goto msg_bytes_ready; release_sock(sk); - sk_psock_put(sk, psock); - return tcp_recvmsg(sk, msg, len, flags, addr_len); + return 0; } copied = -EAGAIN; } @@ -397,7 +412,35 @@ static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, unlock: release_sock(sk); + return ret; +} + +static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, + int flags, int *addr_len) +{ + struct sk_psock *psock; + int ret; + + if (unlikely(flags & MSG_ERRQUEUE)) + return inet_recv_error(sk, msg, len, addr_len); + + if (!len) + return 0; + + psock = sk_psock_get(sk); + if (unlikely(!psock)) + return tcp_recvmsg(sk, msg, len, flags, addr_len); + if (!skb_queue_empty(&sk->sk_receive_queue) && + sk_psock_queue_empty(psock)) { + sk_psock_put(sk, psock); + return tcp_recvmsg(sk, msg, len, flags, addr_len); + } + + ret = __tcp_bpf_recvmsg(sk, psock, sk_msg_recvmsg_actor, msg, + len, flags); sk_psock_put(sk, psock); + if (!ret) + return tcp_recvmsg(sk, msg, len, flags, addr_len); return ret; } -- 2.43.0

