Take ofproto-dpif upcall recv pooling down to the system call interface. Signed-off-by: Jarno Rajahalme <jarno.rajaha...@nsn.com> --- lib/dpif-linux.c | 136 +++++++++++++++++++++++++++++++----------------- lib/dpif-netdev.c | 24 ++++----- lib/dpif-provider.h | 28 ++++++---- lib/dpif.c | 53 ++++++++++++------- lib/dpif.h | 6 ++- lib/netlink-socket.c | 89 +++++++++++++++++++++++++++++++ lib/netlink-socket.h | 11 ++++ ofproto/ofproto-dpif.c | 71 ++++++++++++------------- 8 files changed, 289 insertions(+), 129 deletions(-)
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c index b6eba39..a44e4a4 100644 --- a/lib/dpif-linux.c +++ b/lib/dpif-linux.c @@ -1271,76 +1271,118 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall, return 0; } +#define RECVMMSG_MAX_BATCH 50 + static int -dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall, - struct ofpbuf *buf) +dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall upcalls[], + struct ofpbuf bufs[], int *n_bufs, + void * buf_space, size_t buf_space_size) { struct dpif_linux *dpif = dpif_linux_cast(dpif_); - int read_tries = 0; + int read_errors = 0; + int n = 0; /* upcall index */ + struct iovec iov[RECVMMSG_MAX_BATCH]; + struct mmsghdr mmsg[RECVMMSG_MAX_BATCH]; + int n_msgs = MIN(*n_bufs, RECVMMSG_MAX_BATCH); + size_t buf_size = (buf_space_size / n_msgs) & ~0x7; + int error = EAGAIN; + + memset(&mmsg, 0, sizeof mmsg); + /* Split the buf_space among the msgs */ + for (n = 0; n < n_msgs; ++n) { + iov[n].iov_base = (char *)buf_space; + iov[n].iov_len = buf_size; + buf_space = (char *)buf_space + buf_size; /* Space for next buf */ + mmsg[n].msg_hdr.msg_iov = &iov[n]; + mmsg[n].msg_hdr.msg_iovlen = 1; + } + + n = 0; if (dpif->epoll_fd < 0) { return EAGAIN; } - if (dpif->event_offset >= dpif->n_events) { - int retval; + while (n < n_msgs) { + /* Check if wrap around */ + if (dpif->event_offset >= dpif->n_events) { + int retval; - dpif->event_offset = dpif->n_events = 0; + dpif->event_offset = dpif->n_events = 0; - do { - retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events, - dpif->uc_array_size, 0); - } while (retval < 0 && errno == EINTR); - if (retval < 0) { - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1); - VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno)); - } else if (retval > 0) { + do { + retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events, + dpif->uc_array_size, 0); + } while (retval < 0 && errno == EINTR); + if (retval <= 0) { + if (retval < 0) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1); + VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno)); + } + error = EAGAIN; + goto out; /* stop receiving */ + } dpif->n_events = retval; } - } - while (dpif->event_offset < dpif->n_events) { - int idx = dpif->epoll_events[dpif->event_offset].data.u32; - struct dpif_channel *ch = &dpif->channels[idx]; + while (n < n_msgs && dpif->event_offset < dpif->n_events) { + int idx = dpif->epoll_events[dpif->event_offset].data.u32; + struct dpif_channel *ch = &dpif->channels[idx]; - dpif->event_offset++; + dpif->event_offset++; - for (;;) { - int dp_ifindex; - int error; + for (;;) { + int dp_ifindex; + int count = n_msgs - n; - if (++read_tries > 50) { - return EAGAIN; - } + error = nl_sock_recvm(ch->sock, &mmsg[n], &count, false); - error = nl_sock_recv(ch->sock, buf, false); - if (error == ENOBUFS) { - /* ENOBUFS typically means that we've received so many - * packets that the buffer overflowed. Try again - * immediately because there's almost certainly a packet - * waiting for us. */ - report_loss(dpif_, ch); - continue; - } + if (error == ENOBUFS) { + /* ENOBUFS typically means that we've received so many + * packets that the buffer overflowed. Try again + * immediately because there's almost certainly a packet + * waiting for us. */ + report_loss(dpif_, ch); + if (++read_errors > 50) { + error = EAGAIN; + goto out; + } + continue; + } - ch->last_poll = time_msec(); - if (error) { - if (error == EAGAIN) { - break; + ch->last_poll = time_msec(); + if (error) { + if (error != EAGAIN && ++read_errors > 50) { + goto out; + } + break; /* Skip this channel */ } - return error; - } - error = parse_odp_packet(buf, upcall, &dp_ifindex); - if (!error && dp_ifindex == dpif->dp_ifindex) { - return 0; - } else if (error) { - return error; + count += n; /* One past last index */ + for (;n < count; ++n) { + /* Init ofpbuf */ + ofpbuf_use_stub(&bufs[n], (char *)iov[n].iov_base, + iov[n].iov_len); + bufs[n].size = mmsg[n].msg_len; + + error = parse_odp_packet(&bufs[n], &upcalls[n], + &dp_ifindex); + if (error || dp_ifindex != dpif->dp_ifindex) { + /* Mark upcall invalid */ + upcalls[n].type = DPIF_N_UC_TYPES; + } + } + break; } } } - - return EAGAIN; + out: + /* See what we got */ + *n_bufs = n; + if (n > 0) { + return 0; + } + return error; /* may be zero */ } static void diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index d315b59..3fa76d8 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -978,23 +978,23 @@ find_nonempty_queue(struct dpif *dpif) } static int -dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, - struct ofpbuf *buf) +dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall upcalls[], + struct ofpbuf bufs[], int *n_bufs, + void * buf_space OVS_UNUSED, size_t buf_space_size OVS_UNUSED) { - struct dp_netdev_queue *q = find_nonempty_queue(dpif); - if (q) { - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; + int n = 0; - *upcall = u->upcall; - upcall->packet = buf; + struct dp_netdev_queue *q; - ofpbuf_uninit(buf); - *buf = u->buf; + for (n = 0; n < *n_bufs && (q = find_nonempty_queue(dpif)); ++n) { + struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; - return 0; - } else { - return EAGAIN; + upcalls[n] = u->upcall; + upcalls[n].packet = &bufs[n]; + bufs[n] = u->buf; } + *n_bufs = n; + return (n > 0) ? 0 : EAGAIN; } static void diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h index bea822f..c41827e 100644 --- a/lib/dpif-provider.h +++ b/lib/dpif-provider.h @@ -25,6 +25,7 @@ #include "openflow/openflow.h" #include "dpif.h" #include "util.h" +#include "ofpbuf.h" #ifdef __cplusplus extern "C" { @@ -326,20 +327,27 @@ struct dpif_class { int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id, uint32_t *priority); - /* Polls for an upcall from 'dpif'. If successful, stores the upcall into - * '*upcall', using 'buf' for storage. Should only be called if 'recv_set' - * has been used to enable receiving packets from 'dpif'. + /* Polls for upcalls from 'dpif'. If successful, stores the upcalls into + * 'upcalls[]', initializing 'bufs[]' with 'buf_space' for storage. + * Should only be called if 'recv_set' has been used to enable receiving + * packets from 'dpif'. * - * The implementation should point 'upcall->packet' and 'upcall->key' into - * data in the caller-provided 'buf'. If necessary to make room, the - * implementation may expand the data in 'buf'. (This is hardly a great - * way to do things but it works out OK for the dpif providers that exist - * so far.) + * The implementation should initialize the 'bufs[]' needed, preferably + * using 'buf_space' of size 'buf_space_size', and point + * 'upcalls[].packet' and 'upcalls[].key' into 'data' in the 'bufs[]'. + * + * Caller gives the size of the 'upcalls' and 'bufs' arrays in '*n_bufs', + * which also returns the number of received upcalls. + * + * If necessary, the implementation can use or allocate additional space + * for 'bufs[]'. It is thus important the caller frees the first '*n_bufs' + * (as returned) 'bufs' with ofpbuf_uninit(). * * This function must not block. If no upcall is pending when it is * called, it should return EAGAIN without blocking. */ - int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall, - struct ofpbuf *buf); + int (*recv)(struct dpif *dpif, struct dpif_upcall upcalls[], + struct ofpbuf bufs[], int *n_bufs, + void *buf_space, size_t buf_space_size); /* Arranges for the poll loop to wake up when 'dpif' has a message queued * to be received with the recv member function. */ diff --git a/lib/dpif.c b/lib/dpif.c index 6aa52d5..fda7924 100644 --- a/lib/dpif.c +++ b/lib/dpif.c @@ -1116,37 +1116,50 @@ dpif_recv_set(struct dpif *dpif, bool enable) return error; } -/* Polls for an upcall from 'dpif'. If successful, stores the upcall into - * '*upcall', using 'buf' for storage. Should only be called if - * dpif_recv_set() has been used to enable receiving packets on 'dpif'. +/* Polls for upcalls from 'dpif'. If successful, stores the upcalls into + * 'upcalls[]', using 'bufs[]' for storage preferably within 'buf_space' of + * size 'buf_space_size'. Caller gives the size of the arrays in '*n_bufs', + * which also returns the number of received upcalls. + * Should only be called if dpif_recv_set() has been used to enable receiving + * packets on 'dpif'. * - * 'upcall->packet' and 'upcall->key' point into data in the caller-provided - * 'buf', so their memory cannot be freed separately from 'buf'. (This is - * hardly a great way to do things but it works out OK for the dpif providers - * and clients that exist so far.) + * 'upcalls[].packet' and 'upcalls[].key' point into data in 'bufs[]', so their + * memory cannot be freed separately from 'bufs'. The first '*n_bufs' (as + * returned) 'bufs' are initialized by the class implementation, and must be + * freed with ofpbuf_uninit() by the caller after the 'upcalls' have been + * processed. * * Returns 0 if successful, otherwise a positive errno value. Returns EAGAIN * if no upcall is immediately available. */ int -dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) +dpif_recv(struct dpif *dpif, + struct dpif_upcall upcalls[], struct ofpbuf bufs[], int * n_bufs, + void *buf_space, size_t buf_space_size) { - int error = dpif->dpif_class->recv(dpif, upcall, buf); + int error = dpif->dpif_class->recv(dpif, upcalls, bufs, n_bufs, + buf_space, buf_space_size); + if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) { - struct ds flow; - char *packet; + int i; - packet = ofp_packet_to_string(upcall->packet->data, - upcall->packet->size); + for (i = 0; i < *n_bufs && !VLOG_DROP_DBG(&dpmsg_rl); ++i) { + struct ds flow; + char *packet; - ds_init(&flow); - odp_flow_key_format(upcall->key, upcall->key_len, &flow); + packet = ofp_packet_to_string(upcalls[i].packet->data, + upcalls[i].packet->size); - VLOG_DBG("%s: %s upcall:\n%s\n%s", - dpif_name(dpif), dpif_upcall_type_to_string(upcall->type), - ds_cstr(&flow), packet); + ds_init(&flow); + odp_flow_key_format(upcalls[i].key, upcalls[i].key_len, &flow); - ds_destroy(&flow); - free(packet); + VLOG_DBG("%s: %s upcall:\n%s\n%s", + dpif_name(dpif), + dpif_upcall_type_to_string(upcalls[i].type), + ds_cstr(&flow), packet); + + ds_destroy(&flow); + free(packet); + } } else if (error && error != EAGAIN) { log_operation(dpif, "recv", error); } diff --git a/lib/dpif.h b/lib/dpif.h index c5e3fc8..14e3c64 100644 --- a/lib/dpif.h +++ b/lib/dpif.h @@ -327,6 +327,7 @@ #include "openflow/openflow.h" #include "netdev.h" #include "util.h" +#include "ofpbuf.h" #ifdef __cplusplus extern "C" { @@ -336,7 +337,6 @@ struct dpif; struct ds; struct flow; struct nlattr; -struct ofpbuf; struct sset; struct dpif_class; @@ -557,7 +557,9 @@ struct dpif_upcall { }; int dpif_recv_set(struct dpif *, bool enable); -int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *); +int dpif_recv(struct dpif *, struct dpif_upcall upcalls[], + struct ofpbuf bufs[], int * n_bufs, + void * buf_space, size_t buf_space_size); void dpif_recv_purge(struct dpif *); void dpif_recv_wait(struct dpif *); diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c index e6b10a1..361cb87 100644 --- a/lib/netlink-socket.c +++ b/lib/netlink-socket.c @@ -420,6 +420,95 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf *buf, bool wait) return nl_sock_recv__(sock, buf, wait); } +static int +nl_sock_recvm__(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs, + bool wait) +{ + ssize_t retval; + + int n = 0; + retval = EAGAIN; + for (n = 0; n < *n_msgs; ++n) { + do { + retval = recvmsg(sock->fd, &mmsg[n].msg_hdr, + wait ? 0 : MSG_DONTWAIT); + } while (retval < 0 && errno == EINTR); + if (retval <= 0) + break; + mmsg[n].msg_len = retval; + } + if (n > 0) { + retval = n; + } + + if (retval < 0) { + int error = errno; + if (error == ENOBUFS) { + /* Socket receive buffer overflow dropped one or more messages that + * the kernel tried to send to us. */ + COVERAGE_INC(netlink_overflow); + } + return error; + } + + if (retval > 0) { + int i; + int n_invalid = 0; + int last_good = -1; + + for (i = 0; i < retval; ++i) { + if (mmsg[i].msg_hdr.msg_flags & MSG_TRUNC) { + VLOG_ERR_RL(&rl, "truncated message (longer than %u bytes)", + mmsg[i].msg_len); + mmsg[i].msg_len = 0; /* Mark as invalid */ + ++n_invalid; + } else { + struct nlmsghdr *nlmsghdr; + + nlmsghdr = mmsg[i].msg_hdr.msg_iov->iov_base; + + if (mmsg[i].msg_len < sizeof *nlmsghdr + || mmsg[i].msg_hdr.msg_iov->iov_len < sizeof *nlmsghdr + || nlmsghdr->nlmsg_len < sizeof *nlmsghdr + || nlmsghdr->nlmsg_len > mmsg[i].msg_len) { + VLOG_ERR_RL(&rl, "received invalid nlmsg (%u bytes < %zu)", + mmsg[i].msg_len, sizeof *nlmsghdr); + mmsg[i].msg_len = 0; + ++n_invalid; + } + } + + if (mmsg[i].msg_len > 0) { + last_good = i; + log_nlmsg(__func__, 0, mmsg[i].msg_hdr.msg_iov->iov_base, + mmsg[i].msg_len, sock->protocol); + } + } + if (n_invalid == retval) { + /* We have nothing */ + return E2BIG; + } + /* trim trailing zero-length msgs */ + retval = last_good + 1; + } + + *n_msgs = retval; + + COVERAGE_INC(netlink_received); + + return 0; +} + +int +nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs, bool wait) +{ + int error = nl_sock_cow__(sock); + if (error) { + return error; + } + return nl_sock_recvm__(sock, mmsg, n_msgs, wait); +} + static void nl_sock_record_errors__(struct nl_transaction **transactions, size_t n, int error) diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h index 78dd7b2..9827f08 100644 --- a/lib/netlink-socket.h +++ b/lib/netlink-socket.h @@ -36,6 +36,8 @@ #include <stddef.h> #include <stdint.h> #include "ofpbuf.h" +#include <sys/socket.h> +#include <linux/version.h> struct nl_sock; @@ -43,6 +45,13 @@ struct nl_sock; #error "netlink-socket.h is only for hosts that support Netlink sockets" #endif +#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,32) +struct mmsghdr { + struct msghdr msg_hdr; /* Message header */ + unsigned int msg_len; /* Number of received bytes for header */ +}; +#endif + /* Netlink sockets. */ int nl_sock_create(int protocol, struct nl_sock **); int nl_sock_clone(const struct nl_sock *, struct nl_sock **); @@ -55,6 +64,8 @@ int nl_sock_send(struct nl_sock *, const struct ofpbuf *, bool wait); int nl_sock_send_seq(struct nl_sock *, const struct ofpbuf *, uint32_t nlmsg_seq, bool wait); int nl_sock_recv(struct nl_sock *, struct ofpbuf *, bool wait); +int nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs, + bool wait); int nl_sock_transact(struct nl_sock *, const struct ofpbuf *request, struct ofpbuf **replyp); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 109e57c..4584987 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -3743,8 +3743,14 @@ exit: return error; } +enum upcall_type { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL }; + +static enum upcall_type classify_upcall(const struct dpif_upcall *); + +static void handle_sflow_upcall(struct dpif_backer *, const struct dpif_upcall *); + static void -handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls, +do_handle_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls, size_t n_upcalls) { struct dpif_upcall *upcall; @@ -3777,6 +3783,19 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls, uint32_t hash; int error; + switch (classify_upcall(upcall)) { + case MISS_UPCALL: + /* Handle below below. */ + break; + + case SFLOW_UPCALL: + handle_sflow_upcall(backer, upcall); + continue; + + case BAD_UPCALL: + continue; + } + error = ofproto_receive(backer, upcall->packet, upcall->key, upcall->key_len, &flow, &miss->key_fitness, &ofproto, &odp_in_port, &miss->initial_tci); @@ -3867,7 +3886,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls, hmap_destroy(&todo); } -static enum { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL } +static enum upcall_type classify_upcall(const struct dpif_upcall *upcall) { union user_action_cookie cookie; @@ -3925,53 +3944,29 @@ handle_sflow_upcall(struct dpif_backer *backer, static int handle_upcalls(struct dpif_backer *backer, unsigned int max_batch) { - struct dpif_upcall misses[FLOW_MISS_MAX_BATCH]; + struct dpif_upcall upcalls[FLOW_MISS_MAX_BATCH]; struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH]; uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8]; - int n_processed; - int n_misses; + int n_upcalls = max_batch; int i; + int error; ovs_assert(max_batch <= FLOW_MISS_MAX_BATCH); - n_misses = 0; - for (n_processed = 0; n_processed < max_batch; n_processed++) { - struct dpif_upcall *upcall = &misses[n_misses]; - struct ofpbuf *buf = &miss_bufs[n_misses]; - int error; - - ofpbuf_use_stub(buf, miss_buf_stubs[n_misses], - sizeof miss_buf_stubs[n_misses]); - error = dpif_recv(backer->dpif, upcall, buf); - if (error) { - ofpbuf_uninit(buf); - break; - } - - switch (classify_upcall(upcall)) { - case MISS_UPCALL: - /* Handle it later. */ - n_misses++; - break; - - case SFLOW_UPCALL: - handle_sflow_upcall(backer, upcall); - ofpbuf_uninit(buf); - break; - - case BAD_UPCALL: - ofpbuf_uninit(buf); - break; - } + error = dpif_recv(backer->dpif, upcalls, miss_bufs, &n_upcalls, + miss_buf_stubs, sizeof(miss_buf_stubs)); + if (error || n_upcalls == 0) { + return 0; } - /* Handle deferred MISS_UPCALL processing. */ - handle_miss_upcalls(backer, misses, n_misses); - for (i = 0; i < n_misses; i++) { + /* Handle upcalls processing. */ + do_handle_upcalls(backer, upcalls, n_upcalls); + + for (i = 0; i < n_upcalls; i++) { ofpbuf_uninit(&miss_bufs[i]); } - return n_processed; + return n_upcalls; } /* Flow expiration. */ -- 1.7.10.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev