Implement the new optional callbacks io_async_write and io_async_flush on QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is available in the host kernel, and TCP sockets are used.
qio_channel_socket_writev() contents were moved to a helper function __qio_channel_socket_writev() which accepts an extra 'flag' argument. This helper function is used to implement qio_channel_socket_writev(), with flags = 0, keeping it's behavior unchanged, and qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY. qio_channel_socket_async_flush() was implemented by reading the socket's error queue, which will have information on MSG_ZEROCOPY send completion. There is no need to worry with re-sending packets in case any error happens, as MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs. Notes on using async_write(): - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying, some caution is necessary to avoid overwriting any buffer before it's sent. If something like this happen, a newer version of the buffer may be sent instead. - If this is a problem, it's recommended to use async_flush() before freeing or re-using the buffer. - When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require a larger amount than usually available to non-root user. - If the required amount of locked memory is not available, it falls-back to buffer copying behavior, and synchronous sending. Signed-off-by: Leonardo Bras <leob...@redhat.com> --- include/io/channel-socket.h | 2 + include/io/channel.h | 1 + io/channel-socket.c | 176 ++++++++++++++++++++++++++++++++++-- 3 files changed, 169 insertions(+), 10 deletions(-) diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h index e747e63514..4d1be0637a 100644 --- a/include/io/channel-socket.h +++ b/include/io/channel-socket.h @@ -47,6 +47,8 @@ struct QIOChannelSocket { socklen_t localAddrLen; struct sockaddr_storage remoteAddr; socklen_t remoteAddrLen; + ssize_t async_queued; + ssize_t async_sent; }; diff --git a/include/io/channel.h b/include/io/channel.h index 74f2e3ae8a..611bb2ea26 100644 --- a/include/io/channel.h +++ b/include/io/channel.h @@ -31,6 +31,7 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass, #define QIO_CHANNEL_ERR_BLOCK -2 +#define QIO_CHANNEL_ERR_NOBUFS -3 typedef enum QIOChannelFeature QIOChannelFeature; diff --git a/io/channel-socket.c b/io/channel-socket.c index 606ec97cf7..c67832d0bb 100644 --- a/io/channel-socket.c +++ b/io/channel-socket.c @@ -26,9 +26,23 @@ #include "io/channel-watch.h" #include "trace.h" #include "qapi/clone-visitor.h" +#ifdef CONFIG_LINUX +#include <linux/errqueue.h> +#include <poll.h> +#endif #define SOCKET_MAX_FDS 16 +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp); + +static void qio_channel_socket_async_flush(QIOChannel *ioc, + Error **errp); + SocketAddress * qio_channel_socket_get_local_address(QIOChannelSocket *ioc, Error **errp) @@ -55,6 +69,8 @@ qio_channel_socket_new(void) sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); sioc->fd = -1; + sioc->async_queued = 0; + sioc->async_sent = 0; ioc = QIO_CHANNEL(sioc); qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); @@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, Error **errp) { int fd; + int ret, v = 1; trace_qio_channel_socket_connect_sync(ioc, addr); fd = socket_connect(addr, errp); @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, return -1; } +#ifdef CONFIG_LINUX + if (addr->type != SOCKET_ADDRESS_TYPE_INET) { + return 0; + } + + ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v)); + if (ret >= 0) { + QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); + klass->io_async_writev = qio_channel_socket_async_writev; + klass->io_async_flush = qio_channel_socket_async_flush; + } +#endif + return 0; } @@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc, return ret; } -static ssize_t qio_channel_socket_writev(QIOChannel *ioc, - const struct iovec *iov, - size_t niov, - int *fds, - size_t nfds, - Error **errp) +static ssize_t __qio_channel_socket_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + int flags, + Error **errp) { QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); ssize_t ret; @@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc, } retry: - ret = sendmsg(sioc->fd, &msg, 0); + ret = sendmsg(sioc->fd, &msg, flags); if (ret <= 0) { - if (errno == EAGAIN) { + switch (errno) { + case EAGAIN: return QIO_CHANNEL_ERR_BLOCK; - } - if (errno == EINTR) { + case EINTR: goto retry; + case ENOBUFS: + return QIO_CHANNEL_ERR_NOBUFS; } + error_setg_errno(errp, errno, "Unable to write to socket"); return -1; } return ret; } + +static ssize_t qio_channel_socket_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp) +{ + return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp); +} + +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp) +{ + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); + ssize_t ret; + + sioc->async_queued++; + + ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY, + errp); + if (ret == QIO_CHANNEL_ERR_NOBUFS) { + /* + * Not enough locked memory available to the process. + * Fallback to default sync callback. + */ + + if (errp && *errp) { + warn_reportf_err(*errp, + "Process can't lock enough memory for using MSG_ZEROCOPY," + "falling back to non-zerocopy"); + } + + QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); + klass->io_async_writev = NULL; + klass->io_async_flush = NULL; + + /* Re-send current buffer */ + ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp); + } + + return ret; +} + + +static void qio_channel_socket_async_flush(QIOChannel *ioc, + Error **errp) +{ + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); + struct msghdr msg = {}; + struct pollfd pfd; + struct sock_extended_err *serr; + struct cmsghdr *cm; + char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; + int ret; + + memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + + while (sioc->async_sent < sioc->async_queued) { + ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE); + if (ret < 0) { + if (errno == EAGAIN) { + /* Nothing on errqueue, wait */ + pfd.fd = sioc->fd; + pfd.events = 0; + ret = poll(&pfd, 1, 250); + if (ret == 0) { + /* + * Timeout : After 250ms without receiving any zerocopy + * notification, consider all data as sent. + */ + break; + } else if (ret < 0 || + (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) { + error_setg_errno(errp, errno, + "Poll error"); + break; + } else { + continue; + } + } + if (errno == EINTR) { + continue; + } + + error_setg_errno(errp, errno, + "Unable to read errqueue"); + break; + } + + cm = CMSG_FIRSTHDR(&msg); + if (cm->cmsg_level != SOL_IP && + cm->cmsg_type != IP_RECVERR) { + error_setg_errno(errp, EPROTOTYPE, + "Wrong cmsg in errqueue"); + break; + } + + serr = (void *) CMSG_DATA(cm); + if (serr->ee_errno != SO_EE_ORIGIN_NONE) { + error_setg_errno(errp, serr->ee_errno, + "Error on socket"); + break; + } + if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + error_setg_errno(errp, serr->ee_origin, + "Error not from zerocopy"); + break; + } + + /* No errors, count sent ids*/ + sioc->async_sent += serr->ee_data - serr->ee_info + 1; + } +} + + #else /* WIN32 */ static ssize_t qio_channel_socket_readv(QIOChannel *ioc, const struct iovec *iov, -- 2.33.0