Hi Alex, Here comes my full review:
On Fri, 2014-02-07 at 17:17 -0800, Alex Wang wrote: > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c > > +/* Adds 'dpif->n_handlers' channels to vport. If upcall_pids is non-NULL, > + * makes it point to the array of channel socket pids. */ > static int > -add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock > *sock) > +add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no, > + uint32_t **upcall_pids) > { > struct epoll_event event; > + struct dpif_epoll *epolls = dpif->epolls; > uint32_t port_idx = odp_to_u32(port_no); > + uint32_t *pids = NULL; The variable pids is never used, but freed. > + int error = 0; > + size_t i, j; > > - if (dpif->epoll_fd < 0) { > + if (epolls == NULL) { > return 0; > } > > /* We assume that the datapath densely chooses port numbers, which > - * can therefore be used as an index into an array of channels. */ > + * can therefore be used as an index into dpif->channels. */ > if (port_idx >= dpif->uc_array_size) { > uint32_t new_size = port_idx + 1; > - uint32_t i; > > if (new_size > MAX_PORTS) { > VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big", > @@ -332,49 +318,117 @@ add_channel(struct dpif_linux *dpif, odp_port_t > port_no, struct nl_sock *sock) > dpif->channels = xrealloc(dpif->channels, > new_size * sizeof *dpif->channels); > for (i = dpif->uc_array_size; i < new_size; i++) { > - dpif->channels[i].sock = NULL; > + dpif->channels[i] = NULL; > + } > + for (i = 0; i < dpif->n_handlers; i++) { > + epolls[i].epoll_events = xrealloc(epolls[i].epoll_events, > + new_size * sizeof > + *epolls[i].epoll_events); > } > - > - dpif->epoll_events = xrealloc(dpif->epoll_events, > - new_size * sizeof *dpif->epoll_events); > dpif->uc_array_size = new_size; > } > > memset(&event, 0, sizeof event); > event.events = EPOLLIN; > event.data.u32 = port_idx; > - if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock), > - &event) < 0) { > - return errno; > - } > > - nl_sock_destroy(dpif->channels[port_idx].sock); > - dpif->channels[port_idx].sock = sock; > - dpif->channels[port_idx].last_poll = LLONG_MIN; > + /* Creates channel for each upcall handler. */ > + dpif->channels[port_idx] = xzalloc(dpif->n_handlers > + * sizeof *dpif->channels[port_idx]); > + for (i = 0; i < dpif->n_handlers; i++) { > + struct nl_sock *sock = NULL; > + > + error = nl_sock_create(NETLINK_GENERIC, &sock); > + if (error) { > + goto error; > + } > + > + if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock), > + &event) < 0) { > + error = errno; > + goto error; If this epoll_ctl fails, the sock created above is not destroyed in error handling. > + } > + dpif->channels[port_idx][i].sock = sock; > + dpif->channels[port_idx][i].last_poll = LLONG_MIN; > + } > + channels_to_pids(dpif->channels[port_idx], dpif->n_handlers, > upcall_pids); > > return 0; > + > +error: > + /* Cleans up the already created channel and socks. */ > + for (j = 0; j < i; j++) { > + nl_sock_destroy(dpif->channels[port_idx][j].sock); > + } > + free(pids); > + free(dpif->channels[port_idx]); > + dpif->channels[port_idx] = NULL; > + > + return error; > } > ... > +static void > +destroy_all_channels(struct dpif_linux *dpif) > +{ > + unsigned int i; > + > + if (!dpif->epolls) { > return; > } > > - epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL); > - dpif->event_offset = dpif->n_events = 0; > + for (i = 0; i < dpif->uc_array_size; i++ ) { > + struct dpif_linux_vport vport_request; > + struct dpif_channel *ch = dpif->channels[i]; > + > + if (!ch->sock) { The condition here should be changed to "if (!ch)" because ch is now an array of channels. > + continue; > + } ... > > @@ -500,20 +551,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev > *netdev, > namebuf, sizeof namebuf); > const char *type = netdev_get_type(netdev); > struct dpif_linux_vport request, reply; > - struct nl_sock *sock = NULL; > - uint32_t upcall_pid; > struct ofpbuf *buf; > uint64_t options_stub[64 / 8]; > struct ofpbuf options; > + uint32_t *upcall_pids = NULL; > int error; > > - if (dpif->epoll_fd >= 0) { > - error = nl_sock_create(NETLINK_GENERIC, &sock); > - if (error) { > - return error; > - } > - } > - > dpif_linux_vport_init(&request); > request.cmd = OVS_VPORT_CMD_NEW; > request.dp_ifindex = dpif->dp_ifindex; > @@ -522,7 +565,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev > *netdev, > VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it has " > "unsupported type `%s'", > dpif_name(dpif_), name, type); > - nl_sock_destroy(sock); > return EINVAL; > } > request.name = name; > @@ -541,41 +583,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev > *netdev, > } > > request.port_no = *port_nop; > - upcall_pid = sock ? nl_sock_pid(sock) : 0; > - request.upcall_pid = &upcall_pid; > + request.upcall_pids = NULL; > > error = dpif_linux_vport_transact(&request, &reply, &buf); > if (!error) { > *port_nop = reply.port_no; > - VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32, > - dpif_name(dpif_), reply.port_no, upcall_pid); > } else { > if (error == EBUSY && *port_nop != ODPP_NONE) { > VLOG_INFO("%s: requested port %"PRIu32" is in use", > dpif_name(dpif_), *port_nop); > } > - nl_sock_destroy(sock); > ofpbuf_delete(buf); > return error; > } > ofpbuf_delete(buf); > > - if (sock) { > - error = add_channel(dpif, *port_nop, sock); > - if (error) { > - VLOG_INFO("%s: could not add channel for port %s", > - dpif_name(dpif_), name); > + if (add_vport_channels(dpif, *port_nop, &upcall_pids)) { > + VLOG_INFO("%s: could not add channel for port %s", > + dpif_name(dpif_), name); > > - /* Delete the port. */ > - dpif_linux_vport_init(&request); > - request.cmd = OVS_VPORT_CMD_DEL; > - request.dp_ifindex = dpif->dp_ifindex; > - request.port_no = *port_nop; > - dpif_linux_vport_transact(&request, NULL, NULL); > + /* Delete the port. */ > + dpif_linux_vport_init(&request); > + request.cmd = OVS_VPORT_CMD_DEL; > + request.dp_ifindex = dpif->dp_ifindex; > + request.port_no = *port_nop; > + dpif_linux_vport_transact(&request, NULL, NULL); > > - nl_sock_destroy(sock); > - return error; > - } > + return error; > + } else { > + dpif_linux_vport_init(&request); > + request.cmd = OVS_VPORT_CMD_SET; > + request.dp_ifindex = dpif->dp_ifindex; > + request.port_no = *port_nop; > + request.n_pids = dpif->n_handlers; > + request.upcall_pids = upcall_pids; > + dpif_linux_vport_transact(&request, NULL, NULL); > + free(upcall_pids); > } > > return 0; In this function, you separated vport transaction for creating vports and setting channel pids to kernel. But it could lead to problem when kernel started sending upcalls to user space when vports are added but channel pids not updated. I think it should still be kept as a single transaction to avoid race condition. ... > > static uint32_t > -dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no) > +dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no, > + uint32_t hash) I suggest to use struct flow instead of hash as the parameter, because the handler mapping is dpif implementation specific. it is dpif implementation that determines how to utilize flow information for pid selection. E.g. dpif-XXX may use different hash algorithm than the flow_hash_5tuple(). > { > struct dpif_linux *dpif = dpif_linux_cast(dpif_); > uint32_t port_idx = odp_to_u32(port_no); > uint32_t pid = 0; > > - ovs_mutex_lock(&dpif->upcall_lock); > - if (dpif->epoll_fd >= 0) { > + fat_rwlock_wrlock(&dpif->upcall_lock); Should this be fat_rwlock_rdlock? I don't see updates in below critical section. > + if (dpif->epolls) { > /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s > * channel, since it is not heavily loaded. */ > uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx; > - const struct nl_sock *sock = dpif->channels[idx].sock; > - pid = sock ? nl_sock_pid(sock) : 0; > + const struct dpif_channel *ch = dpif->channels[idx]; > + > + if (ch) { > + pid = ch[hash % dpif->n_handlers].sock > + ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0; > + } > } > - ovs_mutex_unlock(&dpif->upcall_lock); > + fat_rwlock_unlock(&dpif->upcall_lock); > > return pid; > } ... > @@ -1274,10 +1322,10 @@ dpif_linux_operate(struct dpif *dpif, struct dpif_op > **ops, size_t n_ops) > } > > /* Synchronizes 'dpif->channels' with the set of vports currently in 'dpif' > in > - * the kernel, by adding a new channel for any kernel vport that lacks one > and > - * deleting any channels that have no backing kernel vports. */ > + * the kernel, by adding a new set of channels for any kernel vport that > lacks > + * one and deleting any channels that have no backing kernel vports. */ > static int > -dpif_linux_refresh_channels(struct dpif *dpif_) > +dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers) > { > struct dpif_linux *dpif = dpif_linux_cast(dpif_); > unsigned long int *keep_channels; > @@ -1287,52 +1335,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_) > int retval = 0; > size_t i; > > - /* To start with, we need an epoll fd. */ > - if (dpif->epoll_fd < 0) { > - dpif->epoll_fd = epoll_create(10); > - if (dpif->epoll_fd < 0) { > - return errno; > + if (dpif->n_handlers != n_handlers) { > + destroy_all_channels(dpif); > + dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls); > + > + for (i = 0; i < n_handlers; i++) { > + dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ? > + dpif->uc_array_size : > 10); > + if (dpif->epolls[i].epoll_fd < 0) { > + return errno; > + } > } > + dpif->n_handlers = n_handlers; > + } Could it be more graceful when changing n_handlers? Just enlarge/shrink the channels arrays to minimize the traffic interruption when thread number is being changed? ... > > static int > -dpif_linux_recv_set__(struct dpif *dpif_, bool enable) > +dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t n_handlers) > { > struct dpif_linux *dpif = dpif_linux_cast(dpif_); > > - if ((dpif->epoll_fd >= 0) == enable) { > + if ((dpif->epolls != NULL) == enable) { > + if (enable && dpif->n_handlers != n_handlers) { > + dpif_linux_refresh_channels(dpif_, n_handlers); No return error checking. It is a little weird to do the refreshing here. Other dpif implementation would not know to do this in the interface dpif_recv_set. Please see my next comment. > + } > return 0; > } else if (!enable) { > - destroy_channels(dpif); > + destroy_all_channels(dpif); > return 0; > } else { > - return dpif_linux_refresh_channels(dpif_); > + return dpif_linux_refresh_channels(dpif_, n_handlers); > } > } > ... > diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c > index 7b3e1eb..a53b3fd 100644 > --- a/ofproto/ofproto-dpif.c > +++ b/ofproto/ofproto-dpif.c > @@ -470,7 +470,8 @@ type_run(const char *type) > > backer->recv_set_enable = true; > > - error = dpif_recv_set(backer->dpif, backer->recv_set_enable); > + error = dpif_recv_set(backer->dpif, backer->recv_set_enable, > + n_handlers); > if (error) { > VLOG_ERR("Failed to enable receiving packets in dpif."); > return error; > @@ -480,6 +481,7 @@ type_run(const char *type) > } > > if (backer->recv_set_enable) { > + dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers); > udpif_set_threads(backer->udpif, n_handlers, n_revalidators); > } > The logic of calling dpif_recv_set() here is somehow misleading. I understand that it is only for refreshing dpif->n_handlers, rather than enable/disable receiving. But would it be better to introduce a new interface in dpif to clearly update dpif->n_handler, which can be invoked in udpif_set_threads(). These are all my findings (plus the rcu issue in my previous email). Otherwise, it looks good to me. And it really simplifies the upcall_handler a lot! Would like to know your performance test results. Thanks! Best regards, Han _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev