All that sounds good to me, go ahead and add my acked by then. Acked-by: Ethan Jackson <et...@nicira.com>
On Thu, Apr 17, 2014 at 4:04 PM, Alex Wang <al...@nicira.com> wrote: > Thx for the review, Ethan, > > > On Thu, Apr 17, 2014 at 3:50 PM, Ethan Jackson <et...@nicira.com> wrote: >> >> I don't like that dpif_linux_run() has to take the write lock every >> time it's called. Maybe you could make the dpif->refresh_channels >> variable atomic, check it, and only take the lock if it's true > > > > Yeah, just checked that refresh_channels can only be set/used by main > thread. So, I'll move > the lock acquisition inside if-statement. > > >> >> I think a lot of these functions could use annotations saying whether >> they require a read/write lock from the upcall lock. > > > > Thx for reminding me, I'll add them. my bad, > > > >> >> >> >> Otherwise looks fine to me. >> >> Ethan >> >> On Tue, Apr 15, 2014 at 1:23 PM, Alex Wang <al...@nicira.com> wrote: >> > Signed-off-by: Alex Wang <al...@nicira.com> >> > >> > --- >> > V7 -> V8: >> > - rebase. >> > >> > V6 -> V7: >> > - rebase. >> > - set n_handlers to 0 in destroy_all_channels(). >> > - fix the indexing error in dpif_linux_recv_purge(). >> > >> > V5 -> V6: >> > - move the 'struct dpif_epoll' in 'struct dpif_handler' >> > - fix typos based on review. >> > - refine refresh_channels() comments. >> > - rename 'n_pids' of 'struct dpif_linux_vport' to 'n_upcall_pids'. >> > >> > V4 -> V5: >> > - rebase. >> > >> > V3 -> V4: >> > - add check of handler_id range. >> > >> > V2 -> V3: >> > - use OVS_DP_ATTR_USER_FEATURES to inform datapath about the type of >> > OVS_VPORT_ATTR_UPCALL_PID attribute. >> > - close epoll fd when destroying all channels. >> > >> > PATCH -> V2: >> > - rebase. >> > >> > major changes since RFC: >> > - free the 'upcall_pids' pointer in dpif_linux_refresh_channels(). >> > - for cache access efficiency, in 'recv()' index the handlers >> > array first and then port channels array. >> > --- >> > lib/dpif-linux.c | 549 >> > +++++++++++++++++++++++++++++++++++------------------- >> > lib/dpif-linux.h | 3 +- >> > 2 files changed, 361 insertions(+), 191 deletions(-) >> > >> > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c >> > index e8efdac..87f180e 100644 >> > --- a/lib/dpif-linux.c >> > +++ b/lib/dpif-linux.c >> > @@ -36,6 +36,7 @@ >> > #include "dpif-provider.h" >> > #include "dynamic-string.h" >> > #include "flow.h" >> > +#include "fat-rwlock.h" >> > #include "netdev.h" >> > #include "netdev-linux.h" >> > #include "netdev-vport.h" >> > @@ -132,7 +133,13 @@ struct dpif_channel { >> > long long int last_poll; /* Last time this channel was polled. >> > */ >> > }; >> > >> > -static void report_loss(struct dpif *, struct dpif_channel *); >> > +struct dpif_handler { >> > + struct dpif_channel *channels;/* Array of channels for each >> > handler. */ >> > + struct epoll_event *epoll_events; >> > + int epoll_fd; /* epoll fd that includes channel >> > socks. */ >> > + int n_events; /* Num events returned by >> > epoll_wait(). */ >> > + int event_offset; /* Offset into 'epoll_events'. */ >> > +}; >> > >> > /* Datapath interface for the openvswitch Linux kernel module. */ >> > struct dpif_linux { >> > @@ -140,19 +147,20 @@ struct dpif_linux { >> > int dp_ifindex; >> > >> > /* Upcall messages. */ >> > - struct ovs_mutex upcall_lock; >> > - int uc_array_size; /* Size of 'channels' and >> > 'epoll_events'. */ >> > - struct dpif_channel *channels; >> > - struct epoll_event *epoll_events; >> > - int epoll_fd; /* epoll fd that includes channel >> > socks. */ >> > - int n_events; /* Num events returned by epoll_wait(). >> > */ >> > - int event_offset; /* Offset into 'epoll_events'. */ >> > + struct fat_rwlock upcall_lock; >> > + struct dpif_handler *handlers; >> > + uint32_t n_handlers; /* Num of upcall handlers. */ >> > + int uc_array_size; /* Size of 'handler->channels' and >> > */ >> > + /* 'handler->epoll_events'. */ >> > >> > /* Change notification. */ >> > struct nl_sock *port_notifier; /* vport multicast group subscriber. >> > */ >> > bool refresh_channels; >> > }; >> > >> > +static void report_loss(struct dpif *, struct dpif_channel *, uint32_t >> > ch_idx, >> > + uint32_t handler_id); >> > + >> > static struct vlog_rate_limit error_rl = VLOG_RATE_LIMIT_INIT(9999, 5); >> > >> > /* Generic Netlink family numbers for OVS. >> > @@ -172,8 +180,7 @@ static int dpif_linux_init(void); >> > static int open_dpif(const struct dpif_linux_dp *, struct dpif **); >> > static uint32_t dpif_linux_port_get_pid(const struct dpif *, >> > odp_port_t port_no, uint32_t >> > hash); >> > -static int dpif_linux_refresh_channels(struct dpif *); >> > - >> > +static int dpif_linux_refresh_channels(struct dpif *, uint32_t >> > n_handlers); >> > static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *, >> > struct ofpbuf *); >> > static int dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *, >> > @@ -238,6 +245,7 @@ dpif_linux_open(const struct dpif_class *class >> > OVS_UNUSED, const char *name, >> > } >> > dp_request.name = name; >> > dp_request.user_features |= OVS_DP_F_UNALIGNED; >> > + dp_request.user_features |= OVS_DP_F_VPORT_PIDS; >> > error = dpif_linux_dp_transact(&dp_request, &dp, &buf); >> > if (error) { >> > return error; >> > @@ -255,8 +263,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct >> > dpif **dpifp) >> > >> > dpif = xzalloc(sizeof *dpif); >> > dpif->port_notifier = NULL; >> > - ovs_mutex_init(&dpif->upcall_lock); >> > - dpif->epoll_fd = -1; >> > + fat_rwlock_init(&dpif->upcall_lock); >> > >> > dpif_init(&dpif->dpif, &dpif_linux_class, dp->name, >> > dp->dp_ifindex, dp->dp_ifindex); >> > @@ -267,64 +274,110 @@ open_dpif(const struct dpif_linux_dp *dp, struct >> > dpif **dpifp) >> > return 0; >> > } >> > >> > +/* Destroys the netlink sockets pointed by the elements in 'socksp' >> > + * and frees the 'socksp'. */ >> > static void >> > -destroy_channels(struct dpif_linux *dpif) >> > +vport_del_socksp(struct nl_sock **socksp, uint32_t n_socks) >> > { >> > - unsigned int i; >> > + size_t i; >> > >> > - if (dpif->epoll_fd < 0) { >> > - return; >> > + for (i = 0; i < n_socks; i++) { >> > + nl_sock_destroy(socksp[i]); >> > } >> > >> > - for (i = 0; i < dpif->uc_array_size; i++ ) { >> > - struct dpif_linux_vport vport_request; >> > - struct dpif_channel *ch = &dpif->channels[i]; >> > - uint32_t upcall_pid = 0; >> > + free(socksp); >> > +} >> > >> > - if (!ch->sock) { >> > - continue; >> > +/* Creates an array of netlink sockets. Returns an array of the >> > + * corresponding pointers. Records the error in 'error'. */ >> > +static struct nl_sock ** >> > +vport_create_socksp(uint32_t n_socks, int *error) >> > +{ >> > + struct nl_sock **socksp = xzalloc(n_socks * sizeof *socksp); >> > + size_t i; >> > + >> > + for (i = 0; i < n_socks; i++) { >> > + *error = nl_sock_create(NETLINK_GENERIC, &socksp[i]); >> > + if (*error) { >> > + goto error; >> > } >> > + } >> > >> > - epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), >> > NULL); >> > + return socksp; >> > >> > - /* Turn off upcalls. */ >> > - dpif_linux_vport_init(&vport_request); >> > - vport_request.cmd = OVS_VPORT_CMD_SET; >> > - vport_request.dp_ifindex = dpif->dp_ifindex; >> > - vport_request.port_no = u32_to_odp(i); >> > - vport_request.upcall_pid = &upcall_pid; >> > - dpif_linux_vport_transact(&vport_request, NULL, NULL); >> > +error: >> > + vport_del_socksp(socksp, n_socks); >> > >> > - nl_sock_destroy(ch->sock); >> > + return NULL; >> > +} >> > + >> > +/* Given the array of pointers to netlink sockets 'socksp', returns >> > + * the array of corresponding pids. If the 'socksp' is NULL, returns >> > + * a single-element array of value 0. */ >> > +static uint32_t * >> > +vport_socksp_to_pids(struct nl_sock **socksp, uint32_t n_socks) >> > +{ >> > + uint32_t *pids; >> > + >> > + if (!socksp) { >> > + pids = xzalloc(sizeof *pids); >> > + } else { >> > + size_t i; >> > + >> > + pids = xzalloc(n_socks * sizeof *pids); >> > + for (i = 0; i < n_socks; i++) { >> > + pids[i] = nl_sock_pid(socksp[i]); >> > + } >> > } >> > >> > - free(dpif->channels); >> > - dpif->channels = NULL; >> > - dpif->uc_array_size = 0; >> > + return pids; >> > +} >> > + >> > +/* Given the port number 'port_idx', extracts the pids of netlink >> > sockets >> > + * associated to the port and assigns it to 'upcall_pids'. */ >> > +static bool >> > +vport_get_pids(struct dpif_linux *dpif, uint32_t port_idx, >> > + uint32_t **upcall_pids) >> > +{ >> > + uint32_t *pids; >> > + size_t i; >> > >> > - free(dpif->epoll_events); >> > - dpif->epoll_events = NULL; >> > - dpif->n_events = dpif->event_offset = 0; >> > + /* Since the nl_sock can only be assigned in either all >> > + * or none "dpif->handlers" channels, the following check >> > + * would suffice. */ >> > + if (!dpif->handlers[0].channels[port_idx].sock) { >> > + return false; >> > + } >> > + >> > + pids = xzalloc(dpif->n_handlers * sizeof *pids); >> > + >> > + for (i = 0; i < dpif->n_handlers; i++) { >> > + pids[i] = >> > nl_sock_pid(dpif->handlers[i].channels[port_idx].sock); >> > + } >> > + >> > + *upcall_pids = pids; >> > >> > - /* Don't close dpif->epoll_fd since that would cause other threads >> > that >> > - * call dpif_recv_wait() to wait on an arbitrary fd or a closed fd. >> > */ >> > + return true; >> > } >> > >> > static int >> > -add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock >> > *sock) >> > +vport_add_channels(struct dpif_linux *dpif, odp_port_t port_no, >> > + struct nl_sock **socksp) >> > { >> > struct epoll_event event; >> > uint32_t port_idx = odp_to_u32(port_no); >> > + size_t i, j; >> > + int error; >> > >> > - if (dpif->epoll_fd < 0) { >> > + if (dpif->handlers == NULL) { >> > return 0; >> > } >> > >> > - /* We assume that the datapath densely chooses port numbers, which >> > - * can therefore be used as an index into an array of channels. */ >> > + /* We assume that the datapath densely chooses port numbers, which >> > can >> > + * therefore be used as an index into 'channels' and 'epoll_events' >> > of >> > + * 'dpif->handler'. */ >> > if (port_idx >= dpif->uc_array_size) { >> > uint32_t new_size = port_idx + 1; >> > - uint32_t i; >> > >> > if (new_size > MAX_PORTS) { >> > VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too >> > big", >> > @@ -332,52 +385,122 @@ add_channel(struct dpif_linux *dpif, odp_port_t >> > port_no, struct nl_sock *sock) >> > return EFBIG; >> > } >> > >> > - dpif->channels = xrealloc(dpif->channels, >> > - new_size * sizeof *dpif->channels); >> > - for (i = dpif->uc_array_size; i < new_size; i++) { >> > - dpif->channels[i].sock = NULL; >> > - } >> > + for (i = 0; i < dpif->n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + handler->channels = xrealloc(handler->channels, >> > + new_size * sizeof >> > *handler->channels); >> > + >> > + for (j = dpif->uc_array_size; j < new_size; j++) { >> > + handler->channels[j].sock = NULL; >> > + } >> > + >> > + handler->epoll_events = xrealloc(handler->epoll_events, >> > + new_size * sizeof *handler->epoll_events); >> > >> > - dpif->epoll_events = xrealloc(dpif->epoll_events, >> > - new_size * sizeof >> > *dpif->epoll_events); >> > + } >> > dpif->uc_array_size = new_size; >> > } >> > >> > memset(&event, 0, sizeof event); >> > event.events = EPOLLIN; >> > event.data.u32 = port_idx; >> > - if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock), >> > - &event) < 0) { >> > - return errno; >> > - } >> > >> > - nl_sock_destroy(dpif->channels[port_idx].sock); >> > - dpif->channels[port_idx].sock = sock; >> > - dpif->channels[port_idx].last_poll = LLONG_MIN; >> > + for (i = 0; i < dpif->n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + if (epoll_ctl(handler->epoll_fd, EPOLL_CTL_ADD, >> > nl_sock_fd(socksp[i]), >> > + &event) < 0) { >> > + error = errno; >> > + goto error; >> > + } >> > + dpif->handlers[i].channels[port_idx].sock = socksp[i]; >> > + dpif->handlers[i].channels[port_idx].last_poll = LLONG_MIN; >> > + } >> > >> > return 0; >> > + >> > +error: >> > + for (j = 0; j < i; j++) { >> > + epoll_ctl(dpif->handlers[j].epoll_fd, EPOLL_CTL_DEL, >> > + nl_sock_fd(socksp[j]), NULL); >> > + dpif->handlers[j].channels[port_idx].sock = NULL; >> > + } >> > + >> > + return error; >> > } >> > >> > static void >> > -del_channel(struct dpif_linux *dpif, odp_port_t port_no) >> > +vport_del_channels(struct dpif_linux *dpif, odp_port_t port_no) >> > { >> > - struct dpif_channel *ch; >> > uint32_t port_idx = odp_to_u32(port_no); >> > + size_t i; >> > >> > - if (dpif->epoll_fd < 0 || port_idx >= dpif->uc_array_size) { >> > + if (!dpif->handlers || port_idx >= dpif->uc_array_size) { >> > return; >> > } >> > >> > - ch = &dpif->channels[port_idx]; >> > - if (!ch->sock) { >> > + /* Since the sock can only be assigned in either all or none >> > + * of "dpif->handlers" channels, the following check would >> > + * suffice. */ >> > + if (!dpif->handlers[0].channels[port_idx].sock) { >> > return; >> > } >> > >> > - epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), >> > NULL); >> > - dpif->event_offset = dpif->n_events = 0; >> > + for (i = 0; i < dpif->n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + epoll_ctl(handler->epoll_fd, EPOLL_CTL_DEL, >> > + nl_sock_fd(handler->channels[port_idx].sock), NULL); >> > + nl_sock_destroy(handler->channels[port_idx].sock); >> > + handler->channels[port_idx].sock = NULL; >> > + handler->event_offset = handler->n_events = 0; >> > + } >> > +} >> > + >> > +static void >> > +destroy_all_channels(struct dpif_linux *dpif) >> > +{ >> > + unsigned int i; >> > + >> > + if (!dpif->handlers) { >> > + return; >> > + } >> > + >> > + for (i = 0; i < dpif->uc_array_size; i++ ) { >> > + struct dpif_linux_vport vport_request; >> > + uint32_t upcall_pids = 0; >> > + >> > + /* Since the sock can only be assigned in either all or none >> > + * of "dpif->handlers" channels, the following check would >> > + * suffice. */ >> > + if (!dpif->handlers[0].channels[i].sock) { >> > + continue; >> > + } >> > + >> > + /* Turn off upcalls. */ >> > + dpif_linux_vport_init(&vport_request); >> > + vport_request.cmd = OVS_VPORT_CMD_SET; >> > + vport_request.dp_ifindex = dpif->dp_ifindex; >> > + vport_request.port_no = u32_to_odp(i); >> > + vport_request.upcall_pids = &upcall_pids; >> > + dpif_linux_vport_transact(&vport_request, NULL, NULL); >> > + >> > + vport_del_channels(dpif, u32_to_odp(i)); >> > + } >> > + >> > + for (i = 0; i < dpif->n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + close(handler->epoll_fd); >> > + free(handler->epoll_events); >> > + free(handler->channels); >> > + } >> > >> > - nl_sock_destroy(ch->sock); >> > - ch->sock = NULL; >> > + free(dpif->handlers); >> > + dpif->handlers = NULL; >> > + dpif->n_handlers = 0; >> > + dpif->uc_array_size = 0; >> > } >> > >> > static void >> > @@ -386,11 +509,12 @@ dpif_linux_close(struct dpif *dpif_) >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > >> > nl_sock_destroy(dpif->port_notifier); >> > - destroy_channels(dpif); >> > - if (dpif->epoll_fd >= 0) { >> > - close(dpif->epoll_fd); >> > - } >> > - ovs_mutex_destroy(&dpif->upcall_lock); >> > + >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > + destroy_all_channels(dpif); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > + >> > + fat_rwlock_destroy(&dpif->upcall_lock); >> > free(dpif); >> > } >> > >> > @@ -410,10 +534,13 @@ static void >> > dpif_linux_run(struct dpif *dpif_) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > + >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > if (dpif->refresh_channels) { >> > dpif->refresh_channels = false; >> > - dpif_linux_refresh_channels(dpif_); >> > + dpif_linux_refresh_channels(dpif_, dpif->n_handlers); >> > } >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > } >> > >> > static int >> > @@ -506,16 +633,16 @@ dpif_linux_port_add__(struct dpif *dpif_, struct >> > netdev *netdev, >> > namebuf, sizeof >> > namebuf); >> > const char *type = netdev_get_type(netdev); >> > struct dpif_linux_vport request, reply; >> > - struct nl_sock *sock = NULL; >> > - uint32_t upcall_pid; >> > struct ofpbuf *buf; >> > uint64_t options_stub[64 / 8]; >> > struct ofpbuf options; >> > - int error; >> > + struct nl_sock **socksp = NULL; >> > + uint32_t *upcall_pids; >> > + int error = 0; >> > >> > - if (dpif->epoll_fd >= 0) { >> > - error = nl_sock_create(NETLINK_GENERIC, &sock); >> > - if (error) { >> > + if (dpif->handlers) { >> > + socksp = vport_create_socksp(dpif->n_handlers, &error); >> > + if (!socksp) { >> > return error; >> > } >> > } >> > @@ -528,7 +655,7 @@ dpif_linux_port_add__(struct dpif *dpif_, struct >> > netdev *netdev, >> > VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it >> > has " >> > "unsupported type `%s'", >> > dpif_name(dpif_), name, type); >> > - nl_sock_destroy(sock); >> > + vport_del_socksp(socksp, dpif->n_handlers); >> > return EINVAL; >> > } >> > request.name = name; >> > @@ -547,27 +674,25 @@ dpif_linux_port_add__(struct dpif *dpif_, struct >> > netdev *netdev, >> > } >> > >> > request.port_no = *port_nop; >> > - upcall_pid = sock ? nl_sock_pid(sock) : 0; >> > - request.upcall_pid = &upcall_pid; >> > + upcall_pids = vport_socksp_to_pids(socksp, dpif->n_handlers); >> > + request.n_upcall_pids = dpif->n_handlers; >> > + request.upcall_pids = upcall_pids; >> > >> > error = dpif_linux_vport_transact(&request, &reply, &buf); >> > if (!error) { >> > *port_nop = reply.port_no; >> > - VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32, >> > - dpif_name(dpif_), reply.port_no, upcall_pid); >> > } else { >> > if (error == EBUSY && *port_nop != ODPP_NONE) { >> > VLOG_INFO("%s: requested port %"PRIu32" is in use", >> > dpif_name(dpif_), *port_nop); >> > } >> > - nl_sock_destroy(sock); >> > - ofpbuf_delete(buf); >> > - return error; >> > + >> > + vport_del_socksp(socksp, dpif->n_handlers); >> > + goto exit; >> > } >> > - ofpbuf_delete(buf); >> > >> > - if (sock) { >> > - error = add_channel(dpif, *port_nop, sock); >> > + if (socksp) { >> > + error = vport_add_channels(dpif, *port_nop, socksp); >> > if (error) { >> > VLOG_INFO("%s: could not add channel for port %s", >> > dpif_name(dpif_), name); >> > @@ -578,13 +703,17 @@ dpif_linux_port_add__(struct dpif *dpif_, struct >> > netdev *netdev, >> > request.dp_ifindex = dpif->dp_ifindex; >> > request.port_no = *port_nop; >> > dpif_linux_vport_transact(&request, NULL, NULL); >> > - >> > - nl_sock_destroy(sock); >> > - return error; >> > + vport_del_socksp(socksp, dpif->n_handlers); >> > + goto exit; >> > } >> > } >> > + free(socksp); >> > >> > - return 0; >> > +exit: >> > + ofpbuf_delete(buf); >> > + free(upcall_pids); >> > + >> > + return error; >> > } >> > >> > static int >> > @@ -594,9 +723,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct >> > netdev *netdev, >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > int error; >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > error = dpif_linux_port_add__(dpif_, netdev, port_nop); >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > >> > return error; >> > } >> > @@ -614,7 +743,7 @@ dpif_linux_port_del__(struct dpif *dpif_, odp_port_t >> > port_no) >> > vport.port_no = port_no; >> > error = dpif_linux_vport_transact(&vport, NULL, NULL); >> > >> > - del_channel(dpif, port_no); >> > + vport_del_channels(dpif, port_no); >> > >> > return error; >> > } >> > @@ -625,9 +754,9 @@ dpif_linux_port_del(struct dpif *dpif_, odp_port_t >> > port_no) >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > int error; >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > error = dpif_linux_port_del__(dpif_, port_no); >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > >> > return error; >> > } >> > @@ -679,21 +808,22 @@ dpif_linux_port_query_by_name(const struct dpif >> > *dpif, const char *devname, >> > >> > static uint32_t >> > dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no, >> > - uint32_t hash OVS_UNUSED) >> > + uint32_t hash) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > uint32_t port_idx = odp_to_u32(port_no); >> > uint32_t pid = 0; >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > - if (dpif->epoll_fd >= 0) { >> > + fat_rwlock_rdlock(&dpif->upcall_lock); >> > + if (dpif->handlers) { >> > /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s >> > * channel, since it is not heavily loaded. */ >> > uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx; >> > - const struct nl_sock *sock = dpif->channels[idx].sock; >> > - pid = sock ? nl_sock_pid(sock) : 0; >> > + struct dpif_handler *h = &dpif->handlers[hash % >> > dpif->n_handlers]; >> > + >> > + pid = nl_sock_pid(h->channels[idx].sock); >> > } >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > >> > return pid; >> > } >> > @@ -841,7 +971,7 @@ dpif_linux_port_poll(const struct dpif *dpif_, char >> > **devnamep) >> > || vport.cmd == OVS_VPORT_CMD_SET)) { >> > VLOG_DBG("port_changed: dpif:%s vport:%s >> > cmd:%"PRIu8, >> > dpif->dpif.full_name, vport.name, >> > vport.cmd); >> > - if (vport.cmd == OVS_VPORT_CMD_DEL) { >> > + if (vport.cmd == OVS_VPORT_CMD_DEL && >> > dpif->handlers) { >> > dpif->refresh_channels = true; >> > } >> > *devnamep = xstrdup(vport.name); >> > @@ -1322,11 +1452,12 @@ dpif_linux_operate(struct dpif *dpif, struct >> > dpif_op **ops, size_t n_ops) >> > } >> > } >> > >> > -/* Synchronizes 'dpif->channels' with the set of vports currently in >> > 'dpif' in >> > - * the kernel, by adding a new channel for any kernel vport that lacks >> > one and >> > - * deleting any channels that have no backing kernel vports. */ >> > +/* Synchronizes 'channels' in 'dpif->handlers' with the set of vports >> > + * currently in 'dpif' in the kernel, by adding a new set of channels >> > for >> > + * any kernel vport that lacks one and deleting any channels that have >> > no >> > + * backing kernel vports. */ >> > static int >> > -dpif_linux_refresh_channels(struct dpif *dpif_) >> > +dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > unsigned long int *keep_channels; >> > @@ -1338,53 +1469,80 @@ dpif_linux_refresh_channels(struct dpif *dpif_) >> > int retval = 0; >> > size_t i; >> > >> > - /* To start with, we need an epoll fd. */ >> > - if (dpif->epoll_fd < 0) { >> > - dpif->epoll_fd = epoll_create(10); >> > - if (dpif->epoll_fd < 0) { >> > - return errno; >> > + if (dpif->n_handlers != n_handlers) { >> > + destroy_all_channels(dpif); >> > + dpif->handlers = xzalloc(n_handlers * sizeof *dpif->handlers); >> > + for (i = 0; i < n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + handler->epoll_fd = epoll_create(10); >> > + if (handler->epoll_fd < 0) { >> > + size_t j; >> > + >> > + for (j = 0; j < i; j++) { >> > + close(dpif->handlers[j].epoll_fd); >> > + } >> > + free(dpif->handlers); >> > + dpif->handlers = NULL; >> > + >> > + return errno; >> > + } >> > } >> > + dpif->n_handlers = n_handlers; >> > + } >> > + >> > + for (i = 0; i < n_handlers; i++) { >> > + struct dpif_handler *handler = &dpif->handlers[i]; >> > + >> > + handler->event_offset = handler->n_events = 0; >> > } >> > >> > keep_channels_nbits = dpif->uc_array_size; >> > keep_channels = bitmap_allocate(keep_channels_nbits); >> > >> > - dpif->n_events = dpif->event_offset = 0; >> > - >> > ofpbuf_use_stub(&buf, reply_stub, sizeof reply_stub); >> > dpif_linux_port_dump_start__(dpif_, &dump); >> > while (!dpif_linux_port_dump_next__(dpif_, &dump, &vport, &buf)) { >> > uint32_t port_no = odp_to_u32(vport.port_no); >> > - struct nl_sock *sock = (port_no < dpif->uc_array_size >> > - ? dpif->channels[port_no].sock >> > - : NULL); >> > - bool new_sock = !sock; >> > + uint32_t *upcall_pids = NULL; >> > int error; >> > >> > - if (new_sock) { >> > - error = nl_sock_create(NETLINK_GENERIC, &sock); >> > + if (port_no >= dpif->uc_array_size >> > + || !vport_get_pids(dpif, port_no, &upcall_pids)) { >> > + struct nl_sock **socksp = >> > vport_create_socksp(dpif->n_handlers, >> > + &error); >> > + >> > + if (!socksp) { >> > + goto error; >> > + } >> > + >> > + error = vport_add_channels(dpif, vport.port_no, socksp); >> > if (error) { >> > + VLOG_INFO("%s: could not add channels for port %s", >> > + dpif_name(dpif_), vport.name); >> > + vport_del_socksp(socksp, dpif->n_handlers); >> > retval = error; >> > goto error; >> > } >> > + upcall_pids = vport_socksp_to_pids(socksp, >> > dpif->n_handlers); >> > + free(socksp); >> > } >> > >> > /* Configure the vport to deliver misses to 'sock'. */ >> > - if (!vport.upcall_pid || *vport.upcall_pid != >> > nl_sock_pid(sock)) { >> > - uint32_t upcall_pid = nl_sock_pid(sock); >> > + if (vport.upcall_pids[0] == 0 >> > + || vport.n_upcall_pids != dpif->n_handlers >> > + || memcmp(upcall_pids, vport.upcall_pids, n_handlers * >> > sizeof >> > + *upcall_pids)) { >> > struct dpif_linux_vport vport_request; >> > >> > dpif_linux_vport_init(&vport_request); >> > vport_request.cmd = OVS_VPORT_CMD_SET; >> > vport_request.dp_ifindex = dpif->dp_ifindex; >> > vport_request.port_no = vport.port_no; >> > - vport_request.upcall_pid = &upcall_pid; >> > + vport_request.n_upcall_pids = dpif->n_handlers; >> > + vport_request.upcall_pids = upcall_pids; >> > error = dpif_linux_vport_transact(&vport_request, NULL, >> > NULL); >> > - if (!error) { >> > - VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid >> > %"PRIu32, >> > - dpif_name(&dpif->dpif), vport_request.port_no, >> > - upcall_pid); >> > - } else { >> > + if (error) { >> > VLOG_WARN_RL(&error_rl, >> > "%s: failed to set upcall pid on port: >> > %s", >> > dpif_name(&dpif->dpif), >> > ovs_strerror(error)); >> > @@ -1400,23 +1558,15 @@ dpif_linux_refresh_channels(struct dpif *dpif_) >> > } >> > } >> > >> > - if (new_sock) { >> > - error = add_channel(dpif, vport.port_no, sock); >> > - if (error) { >> > - VLOG_INFO("%s: could not add channel for port %s", >> > - dpif_name(dpif_), vport.name); >> > - retval = error; >> > - goto error; >> > - } >> > - } >> > - >> > if (port_no < keep_channels_nbits) { >> > bitmap_set1(keep_channels, port_no); >> > } >> > + free(upcall_pids); >> > continue; >> > >> > error: >> > - nl_sock_destroy(sock); >> > + free(upcall_pids); >> > + vport_del_channels(dpif, vport.port_no); >> > } >> > nl_dump_done(&dump); >> > ofpbuf_uninit(&buf); >> > @@ -1424,8 +1574,7 @@ dpif_linux_refresh_channels(struct dpif *dpif_) >> > /* Discard any saved channels that we didn't reuse. */ >> > for (i = 0; i < keep_channels_nbits; i++) { >> > if (!bitmap_is_set(keep_channels, i)) { >> > - nl_sock_destroy(dpif->channels[i].sock); >> > - dpif->channels[i].sock = NULL; >> > + vport_del_channels(dpif, u32_to_odp(i)); >> > } >> > } >> > free(keep_channels); >> > @@ -1438,13 +1587,13 @@ dpif_linux_recv_set__(struct dpif *dpif_, bool >> > enable) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > >> > - if ((dpif->epoll_fd >= 0) == enable) { >> > + if ((dpif->handlers != NULL) == enable) { >> > return 0; >> > } else if (!enable) { >> > - destroy_channels(dpif); >> > + destroy_all_channels(dpif); >> > return 0; >> > } else { >> > - return dpif_linux_refresh_channels(dpif_); >> > + return dpif_linux_refresh_channels(dpif_, 1); >> > } >> > } >> > >> > @@ -1454,18 +1603,26 @@ dpif_linux_recv_set(struct dpif *dpif_, bool >> > enable) >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > int error; >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > error = dpif_linux_recv_set__(dpif_, enable); >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > >> > return error; >> > } >> > >> > static int >> > -dpif_linux_handlers_set(struct dpif *dpif_ OVS_UNUSED, >> > - uint32_t n_handlers OVS_UNUSED) >> > +dpif_linux_handlers_set(struct dpif *dpif_, uint32_t n_handlers) >> > { >> > - return 0; >> > + struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > + int error = 0; >> > + >> > + fat_rwlock_wrlock(&dpif->upcall_lock); >> > + if (dpif->handlers) { >> > + error = dpif_linux_refresh_channels(dpif_, n_handlers); >> > + } >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > + >> > + return error; >> > } >> > >> > static int >> > @@ -1543,38 +1700,40 @@ parse_odp_packet(struct ofpbuf *buf, struct >> > dpif_upcall *upcall, >> > } >> > >> > static int >> > -dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall, >> > - struct ofpbuf *buf) >> > +dpif_linux_recv__(struct dpif *dpif_, uint32_t handler_id, >> > + struct dpif_upcall *upcall, struct ofpbuf *buf) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > + struct dpif_handler *handler; >> > int read_tries = 0; >> > >> > - if (dpif->epoll_fd < 0) { >> > - return EAGAIN; >> > + if (!dpif->handlers || handler_id >= dpif->n_handlers) { >> > + return EAGAIN; >> > } >> > >> > - if (dpif->event_offset >= dpif->n_events) { >> > + handler = &dpif->handlers[handler_id]; >> > + if (handler->event_offset >= handler->n_events) { >> > int retval; >> > >> > - dpif->event_offset = dpif->n_events = 0; >> > + handler->event_offset = handler->n_events = 0; >> > >> > do { >> > - retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events, >> > + retval = epoll_wait(handler->epoll_fd, >> > handler->epoll_events, >> > dpif->uc_array_size, 0); >> > } while (retval < 0 && errno == EINTR); >> > if (retval < 0) { >> > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, >> > 1); >> > VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", >> > ovs_strerror(errno)); >> > } else if (retval > 0) { >> > - dpif->n_events = retval; >> > + handler->n_events = retval; >> > } >> > } >> > >> > - while (dpif->event_offset < dpif->n_events) { >> > - int idx = dpif->epoll_events[dpif->event_offset].data.u32; >> > - struct dpif_channel *ch = &dpif->channels[idx]; >> > + while (handler->event_offset < handler->n_events) { >> > + int idx = >> > handler->epoll_events[handler->event_offset].data.u32; >> > + struct dpif_channel *ch = >> > &dpif->handlers[handler_id].channels[idx]; >> > >> > - dpif->event_offset++; >> > + handler->event_offset++; >> > >> > for (;;) { >> > int dp_ifindex; >> > @@ -1590,7 +1749,7 @@ dpif_linux_recv__(struct dpif *dpif_, struct >> > dpif_upcall *upcall, >> > * packets that the buffer overflowed. Try again >> > * immediately because there's almost certainly a >> > packet >> > * waiting for us. */ >> > - report_loss(dpif_, ch); >> > + report_loss(dpif_, ch, idx, handler_id); >> > continue; >> > } >> > >> > @@ -1615,29 +1774,31 @@ dpif_linux_recv__(struct dpif *dpif_, struct >> > dpif_upcall *upcall, >> > } >> > >> > static int >> > -dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id OVS_UNUSED, >> > +dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id, >> > struct dpif_upcall *upcall, struct ofpbuf *buf) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > int error; >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > - error = dpif_linux_recv__(dpif_, upcall, buf); >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_rdlock(&dpif->upcall_lock); >> > + error = dpif_linux_recv__(dpif_, handler_id, upcall, buf); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > >> > return error; >> > } >> > >> > static void >> > -dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id >> > OVS_UNUSED) >> > +dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > - if (dpif->epoll_fd >= 0) { >> > - poll_fd_wait(dpif->epoll_fd, POLLIN); >> > + fat_rwlock_rdlock(&dpif->upcall_lock); >> > + if (dpif->handlers && handler_id < dpif->n_handlers) { >> > + struct dpif_handler *handler = &dpif->handlers[handler_id]; >> > + >> > + poll_fd_wait(handler->epoll_fd, POLLIN); >> > } >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > } >> > >> > static void >> > @@ -1645,18 +1806,21 @@ dpif_linux_recv_purge(struct dpif *dpif_) >> > { >> > struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > >> > - ovs_mutex_lock(&dpif->upcall_lock); >> > - if (dpif->epoll_fd >= 0) { >> > - struct dpif_channel *ch; >> > + fat_rwlock_rdlock(&dpif->upcall_lock); >> > + if (dpif->handlers) { >> > + size_t i, j; >> > + >> > + for (i = 0; i < dpif->uc_array_size; i++ ) { >> > + if (!dpif->handlers[0].channels[i].sock) { >> > + continue; >> > + } >> > >> > - for (ch = dpif->channels; ch < >> > &dpif->channels[dpif->uc_array_size]; >> > - ch++) { >> > - if (ch->sock) { >> > - nl_sock_drain(ch->sock); >> > + for (j = 0; j < dpif->n_handlers; j++) { >> > + nl_sock_drain(dpif->handlers[j].channels[i].sock); >> > } >> > } >> > } >> > - ovs_mutex_unlock(&dpif->upcall_lock); >> > + fat_rwlock_unlock(&dpif->upcall_lock); >> > } >> > >> > const struct dpif_class dpif_linux_class = { >> > @@ -1766,7 +1930,7 @@ dpif_linux_vport_from_ofpbuf(struct >> > dpif_linux_vport *vport, >> > [OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 }, >> > [OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 }, >> > [OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len = >> > IFNAMSIZ }, >> > - [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 }, >> > + [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_UNSPEC }, >> > [OVS_VPORT_ATTR_STATS] = { NL_POLICY_FOR(struct >> > ovs_vport_stats), >> > .optional = true }, >> > [OVS_VPORT_ATTR_OPTIONS] = { .type = NL_A_NESTED, .optional = >> > true }, >> > @@ -1797,7 +1961,10 @@ dpif_linux_vport_from_ofpbuf(struct >> > dpif_linux_vport *vport, >> > vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]); >> > vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]); >> > if (a[OVS_VPORT_ATTR_UPCALL_PID]) { >> > - vport->upcall_pid = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]); >> > + vport->n_upcall_pids = >> > nl_attr_get_size(a[OVS_VPORT_ATTR_UPCALL_PID]) >> > + / (sizeof *vport->upcall_pids); >> > + vport->upcall_pids = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]); >> > + >> > } >> > if (a[OVS_VPORT_ATTR_STATS]) { >> > vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]); >> > @@ -1835,8 +2002,10 @@ dpif_linux_vport_to_ofpbuf(const struct >> > dpif_linux_vport *vport, >> > nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name); >> > } >> > >> > - if (vport->upcall_pid) { >> > - nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID, >> > *vport->upcall_pid); >> > + if (vport->upcall_pids) { >> > + nl_msg_put_unspec(buf, OVS_VPORT_ATTR_UPCALL_PID, >> > + vport->upcall_pids, >> > + vport->n_upcall_pids * sizeof >> > *vport->upcall_pids); >> > } >> > >> > if (vport->stats) { >> > @@ -2241,9 +2410,9 @@ dpif_linux_flow_get_stats(const struct >> > dpif_linux_flow *flow, >> > /* Logs information about a packet that was recently lost in 'ch' (in >> > * 'dpif_'). */ >> > static void >> > -report_loss(struct dpif *dpif_, struct dpif_channel *ch) >> > +report_loss(struct dpif *dpif_, struct dpif_channel *ch, uint32_t >> > ch_idx, >> > + uint32_t handler_id) >> > { >> > - struct dpif_linux *dpif = dpif_linux_cast(dpif_); >> > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); >> > struct ds s; >> > >> > @@ -2257,7 +2426,7 @@ report_loss(struct dpif *dpif_, struct >> > dpif_channel *ch) >> > time_msec() - ch->last_poll); >> > } >> > >> > - VLOG_WARN("%s: lost packet on channel %"PRIdPTR"%s", >> > - dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s)); >> > + VLOG_WARN("%s: lost packet on port channel %u of handler %u", >> > + dpif_name(dpif_), ch_idx, handler_id); >> > ds_destroy(&s); >> > } >> > diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h >> > index ec94ccf..21d0048 100644 >> > --- a/lib/dpif-linux.h >> > +++ b/lib/dpif-linux.h >> > @@ -41,7 +41,8 @@ struct dpif_linux_vport { >> > * 32-bit boundaries, so use get_unaligned_u64() to access its >> > values. >> > */ >> > const char *name; /* OVS_VPORT_ATTR_NAME. */ >> > - const uint32_t *upcall_pid; /* >> > OVS_VPORT_ATTR_UPCALL_PID. */ >> > + uint32_t n_upcall_pids; >> > + const uint32_t *upcall_pids; /* >> > OVS_VPORT_ATTR_UPCALL_PID. */ >> > const struct ovs_vport_stats *stats; /* OVS_VPORT_ATTR_STATS. */ >> > const struct nlattr *options; /* OVS_VPORT_ATTR_OPTIONS. >> > */ >> > size_t options_len; >> > -- >> > 1.7.9.5 >> > >> > _______________________________________________ >> > dev mailing list >> > dev@openvswitch.org >> > http://openvswitch.org/mailman/listinfo/dev > > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev