This commit is an optimization that builds on top of commit 01883eda72bd ("rds: support for zcopy completion notification") for PF_RDS sockets.
Cookies associated with zerocopy completion are passed up on the POLLIN channel, piggybacked with data whereever possible. Such cookies are passed up as ancillary data (at level SOL_RDS) in a struct rds_zcopy_cookies when the returned value of recvmsg() is >= 0. A max of SO_EE_ORIGIN_MAX_ZCOOKIES may be passed with each message. Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com> --- include/uapi/linux/rds.h | 8 +++++++ net/rds/recv.c | 47 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 0 deletions(-) diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h index 12e3bca..e733c01 100644 --- a/include/uapi/linux/rds.h +++ b/include/uapi/linux/rds.h @@ -37,6 +37,8 @@ #include <linux/types.h> #include <linux/socket.h> /* For __kernel_sockaddr_storage. */ +#include <linux/time.h> +#include <linux/errqueue.h> #define RDS_IB_ABI_VERSION 0x301 @@ -104,6 +106,7 @@ #define RDS_CMSG_MASKED_ATOMIC_CSWP 9 #define RDS_CMSG_RXPATH_LATENCY 11 #define RDS_CMSG_ZCOPY_COOKIE 12 +#define RDS_CMSG_ZCOPY_COMPLETION 13 #define RDS_INFO_FIRST 10000 #define RDS_INFO_COUNTERS 10000 @@ -317,6 +320,11 @@ struct rds_rdma_notify { #define RDS_RDMA_DROPPED 3 #define RDS_RDMA_OTHER_ERROR 4 +struct rds_zcopy_cookies { + __u32 num; + __u32 cookies[SO_EE_ORIGIN_MAX_ZCOOKIES]; +}; + /* * Common set of flags for all RDMA related structs */ diff --git a/net/rds/recv.c b/net/rds/recv.c index b080961..44da829 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -577,6 +577,43 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, return ret; } +static int rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) +{ + struct sk_buff *skb, *tmp; + struct sock_exterr_skb *serr; + struct sock *sk = rds_rs_to_sk(rs); + struct sk_buff_head *q = &sk->sk_error_queue; + struct rds_zcopy_cookies done; + u32 *ptr; + int i; + unsigned long flags; + + spin_lock_irqsave(&q->lock, flags); + if (skb_queue_empty(q)) { + spin_unlock_irqrestore(&q->lock, flags); + return 0; + } + skb_queue_walk_safe(q, skb, tmp) { + serr = SKB_EXT_ERR(skb); + if (serr->ee.ee_origin == SO_EE_ORIGIN_ZCOOKIE) { + __skb_unlink(skb, q); + break; + } + } + spin_unlock_irqrestore(&q->lock, flags); + + if (!skb) + return 0; + memset(&done, 0, sizeof(done)); + done.num = serr->ee.ee_data; + ptr = (u32 *)skb->data; + for (i = 0; i < done.num; i++) + done.cookies[i] = *ptr++; + put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(done), &done); + consume_skb(skb); + return done.num; +} + int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, int msg_flags) { @@ -586,6 +623,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, int ret = 0, nonblock = msg_flags & MSG_DONTWAIT; DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name); struct rds_incoming *inc = NULL; + int ncookies; /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */ timeo = sock_rcvtimeo(sk, nonblock); @@ -609,6 +647,14 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, break; } + if (list_empty(&rs->rs_recv_queue) && nonblock) { + ncookies = rds_recvmsg_zcookie(rs, msg); + if (ncookies) { + ret = 0; + break; + } + } + if (!rds_next_incoming(rs, &inc)) { if (nonblock) { ret = -EAGAIN; @@ -656,6 +702,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, msg->msg_flags |= MSG_TRUNC; } + ncookies = rds_recvmsg_zcookie(rs, msg); if (rds_cmsg_recv(inc, msg, rs)) { ret = -EFAULT; goto out; -- 1.7.1