If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and, if the SO_ZEROCOPY socket option has been set on the PF_RDS socket, application pages sent down with rds_sendmsg() are pinned.
The pinning uses the accounting infrastructure added by Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages") The payload bytes in the message may not be modified for the duration that the message has been pinned. A multi-threaded application using this infrastructure may thus need to be notified about send-completion so that it can free/reuse the buffers passed to rds_sendmsg(). Notification of send-completion will identify each message-buffer by a cookie that the application must specify as ancillary data to rds_sendmsg(). The ancillary data in this case has cmsg_level == SOL_RDS and cmsg_type == RDS_CMSG_ZCOPY_COOKIE. Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com> --- include/uapi/linux/rds.h | 1 + net/rds/message.c | 44 +++++++++++++++++++++++++++++++++++++++++++- net/rds/rds.h | 3 ++- net/rds/send.c | 27 ++++++++++++++++++++++++--- 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h index e71d449..09b8cc6 100644 --- a/include/uapi/linux/rds.h +++ b/include/uapi/linux/rds.h @@ -102,6 +102,7 @@ #define RDS_CMSG_ATOMIC_CSWP 7 #define RDS_CMSG_MASKED_ATOMIC_FADD 8 #define RDS_CMSG_MASKED_ATOMIC_CSWP 9 +#define RDS_CMSG_ZCOPY_COOKIE 10 #define RDS_CMSG_RXPATH_LATENCY 11 #define RDS_INFO_FIRST 10000 diff --git a/net/rds/message.c b/net/rds/message.c index 25c74b3..111d5a3 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -337,12 +337,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in return rm; } -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + struct rds_sock *rs, bool zcopy) { unsigned long to_copy, nbytes; unsigned long sg_off; struct scatterlist *sg; int ret = 0; + int length = iov_iter_count(from); rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); @@ -352,6 +354,46 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) sg = rm->data.op_sg; sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ + if (zcopy) { + int total_copied = 0; + size_t zsize = sizeof(struct rds_znotifier); + + rm->data.op_zcopy = 1; + rm->data.op_mmp_znotifier = kzalloc(zsize, GFP_KERNEL); + if (!rm->data.op_mmp_znotifier) + return -ENOMEM; + if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, + length)) { + kfree(rm->data.op_mmp_znotifier); + rm->data.op_mmp_znotifier = NULL; + return -ENOMEM; + } + while (iov_iter_count(from)) { + struct page *pages; + size_t start; + ssize_t copied; + + copied = iov_iter_get_pages(from, &pages, PAGE_SIZE, + 1, &start); + if (copied < 0) { + /* XXX revert pinning */ + kfree(rm->data.op_mmp_znotifier); + rm->data.op_mmp_znotifier = NULL; + return -EFAULT; + } + total_copied += copied; + iov_iter_advance(from, copied); + length -= copied; + sg_set_page(sg, pages, copied, start); + rm->data.op_nents++; + sg++; + } + WARN_ON_ONCE(length != 0); + return ret; + } /* zcopy */ + + rm->data.op_zcopy = 0; + while (iov_iter_count(from)) { if (!sg_page(sg)) { ret = rds_page_remainder_alloc(sg, iov_iter_count(from), diff --git a/net/rds/rds.h b/net/rds/rds.h index de5015a..884d61c 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -781,7 +781,8 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, /* message.c */ struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp); struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents); -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from); +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + struct rds_sock *rs, bool zcopy); struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len); void rds_message_populate_header(struct rds_header *hdr, __be16 sport, __be16 dport, u64 seq); diff --git a/net/rds/send.c b/net/rds/send.c index 5c38ce3..e085730 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -908,6 +908,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len) case RDS_CMSG_RDMA_DEST: case RDS_CMSG_RDMA_MAP: + case RDS_CMSG_ZCOPY_COOKIE: cmsg_groups |= 2; /* these are valid but do no add any size */ break; @@ -935,6 +936,18 @@ static int rds_rm_size(struct msghdr *msg, int data_len) return size; } +static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, + struct cmsghdr *cmsg) +{ + unsigned int *cookie; + + if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie))) + return -EINVAL; + cookie = CMSG_DATA(cmsg); + rm->data.op_mmp_znotifier->z_cookie = *cookie; + return 0; +} + static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, struct msghdr *msg, int *allocated_mr) { @@ -977,6 +990,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, ret = rds_cmsg_atomic(rs, rm, cmsg); break; + case RDS_CMSG_ZCOPY_COOKIE: + ret = rds_cmsg_zcopy(rs, rm, cmsg); + break; + default: return -EINVAL; } @@ -1047,10 +1064,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) long timeo = sock_sndtimeo(sk, nonblock); struct rds_conn_path *cpath; size_t total_payload_len = payload_len, rdma_payload_len = 0; + bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && + sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ - if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { + if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { ret = -EOPNOTSUPP; goto out; } @@ -1107,12 +1126,14 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) /* Attach data to the rm */ if (payload_len) { - rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + int num_sgs = ceil(payload_len, PAGE_SIZE); + + rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); if (!rm->data.op_sg) { ret = -ENOMEM; goto out; } - ret = rds_message_copy_from_user(rm, &msg->msg_iter); + ret = rds_message_copy_from_user(rm, &msg->msg_iter, rs, zcopy); if (ret) goto out; } -- 1.7.1