Hi Maxime, > On Apr 9, 2024, at 19:48, Maxime Coquelin <maxime.coque...@redhat.com> wrote: > > External email: Use caution opening links or attachments > > > From: David Marchand <david.march...@redhat.com> > > Switch to epoll so that the concern over the poll() fd array > is removed. > Add a simple list of used entries and track the next free entry. > > epoll() is thread safe, we no more need a synchronization > mechanism and so can remove the notification pipe. > > Signed-off-by: David Marchand <david.march...@redhat.com> > Signed-off-by: Maxime Coquelin <maxime.coque...@redhat.com> > --- > lib/vhost/fd_man.c | 399 ++++++++++++--------------------------------- > lib/vhost/fd_man.h | 5 +- > 2 files changed, 106 insertions(+), 298 deletions(-) > > diff --git a/lib/vhost/fd_man.c b/lib/vhost/fd_man.c > index 8b47c97d45..a4a2965da1 100644 > --- a/lib/vhost/fd_man.c > +++ b/lib/vhost/fd_man.c > @@ -3,9 +3,9 @@ > */ > > #include <errno.h> > -#include <pthread.h> > #include <stdio.h> > #include <string.h> > +#include <sys/epoll.h> > #include <unistd.h> > > #include <rte_common.h> > @@ -21,49 +21,34 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO); > #define VHOST_FDMAN_LOG(level, ...) \ > RTE_LOG_LINE(level, VHOST_FDMAN, "" __VA_ARGS__) > > -#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL) > - > struct fdentry { > int fd; /* -1 indicates this entry is empty */ > fd_cb rcb; /* callback when this fd is readable. */ > fd_cb wcb; /* callback when this fd is writeable.*/ > void *dat; /* fd context */ > int busy; /* whether this entry is being used in cb. */ > + LIST_ENTRY(fdentry) next; > }; > > struct fdset { > char name[RTE_THREAD_NAME_SIZE]; > - struct pollfd rwfds[MAX_FDS]; > + int epfd; > struct fdentry fd[MAX_FDS]; > + LIST_HEAD(, fdentry) fdlist; > + int next_free_idx; > rte_thread_t tid; > pthread_mutex_t fd_mutex; > - pthread_mutex_t fd_polling_mutex; > - int num; /* current fd number of this fdset */ > - > - union pipefds { > - struct { > - int pipefd[2]; > - }; > - struct { > - int readfd; > - int writefd; > - }; > - } u; > - > - pthread_mutex_t sync_mutex; > - pthread_cond_t sync_cond; > - bool sync; > +
Not sure this blank line is intended or not :) > bool destroy; > }; > > -static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb > wcb, void *dat); > -static uint32_t fdset_event_dispatch(void *arg); > - > #define MAX_FDSETS 8 > > static struct fdset *fdsets[MAX_FDSETS]; > pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER; > > +static uint32_t fdset_event_dispatch(void *arg); > + > static struct fdset * > fdset_lookup(const char *name) > { > @@ -96,166 +81,6 @@ fdset_insert(struct fdset *fdset) > return -1; > } > > -static void > -fdset_pipe_read_cb(int readfd, void *dat, > - int *remove __rte_unused) > -{ > - char charbuf[16]; > - struct fdset *fdset = dat; > - int r = read(readfd, charbuf, sizeof(charbuf)); > - /* > - * Just an optimization, we don't care if read() failed > - * so ignore explicitly its return value to make the > - * compiler happy > - */ > - RTE_SET_USED(r); > - > - pthread_mutex_lock(&fdset->sync_mutex); > - fdset->sync = true; > - pthread_cond_broadcast(&fdset->sync_cond); > - pthread_mutex_unlock(&fdset->sync_mutex); > -} > - > -static void > -fdset_pipe_uninit(struct fdset *fdset) > -{ > - fdset_del(fdset, fdset->u.readfd); > - close(fdset->u.readfd); > - fdset->u.readfd = -1; > - close(fdset->u.writefd); > - fdset->u.writefd = -1; > -} > - > -static int > -fdset_pipe_init(struct fdset *fdset) > -{ > - int ret; > - > - pthread_mutex_init(&fdset->sync_mutex, NULL); > - pthread_cond_init(&fdset->sync_cond, NULL); > - > - if (pipe(fdset->u.pipefd) < 0) { > - VHOST_FDMAN_LOG(ERR, > - "failed to create pipe for vhost fdset"); > - return -1; > - } > - > - ret = fdset_add_no_sync(fdset, fdset->u.readfd, > - fdset_pipe_read_cb, NULL, fdset); > - if (ret < 0) { > - VHOST_FDMAN_LOG(ERR, > - "failed to add pipe readfd %d into vhost server > fdset", > - fdset->u.readfd); > - > - fdset_pipe_uninit(fdset); > - return -1; > - } > - > - return 0; > -} > - > -static void > -fdset_sync(struct fdset *fdset) > -{ > - int ret; > - > - pthread_mutex_lock(&fdset->sync_mutex); > - > - fdset->sync = false; > - ret = write(fdset->u.writefd, "1", 1); > - if (ret < 0) { > - VHOST_FDMAN_LOG(ERR, > - "Failed to write to notification pipe: %s", > - strerror(errno)); > - goto out_unlock; > - } > - > - while (!fdset->sync) > - pthread_cond_wait(&fdset->sync_cond, &fdset->sync_mutex); > - > -out_unlock: > - pthread_mutex_unlock(&fdset->sync_mutex); > -} > - > -static int > -get_last_valid_idx(struct fdset *pfdset, int last_valid_idx) > -{ > - int i; > - > - for (i = last_valid_idx; i >= 0 && pfdset->fd[i].fd == -1; i--) > - ; > - > - return i; > -} > - > -static void > -fdset_move(struct fdset *pfdset, int dst, int src) > -{ > - pfdset->fd[dst] = pfdset->fd[src]; > - pfdset->rwfds[dst] = pfdset->rwfds[src]; > -} > - > -static void > -fdset_shrink_nolock(struct fdset *pfdset) > -{ > - int i; > - int last_valid_idx = get_last_valid_idx(pfdset, pfdset->num - 1); > - > - for (i = 0; i < last_valid_idx; i++) { > - if (pfdset->fd[i].fd != -1) > - continue; > - > - fdset_move(pfdset, i, last_valid_idx); > - last_valid_idx = get_last_valid_idx(pfdset, last_valid_idx - > 1); > - } > - pfdset->num = last_valid_idx + 1; > -} > - > -/* > - * Find deleted fd entries and remove them > - */ > -static void > -fdset_shrink(struct fdset *pfdset) > -{ > - pthread_mutex_lock(&pfdset->fd_mutex); > - fdset_shrink_nolock(pfdset); > - pthread_mutex_unlock(&pfdset->fd_mutex); > -} > - > -/** > - * Returns the index in the fdset for a given fd. > - * @return > - * index for the fd, or -1 if fd isn't in the fdset. > - */ > -static int > -fdset_find_fd(struct fdset *pfdset, int fd) > -{ > - int i; > - > - for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++) > - ; > - > - return i == pfdset->num ? -1 : i; > -} > - > -static void > -fdset_add_fd(struct fdset *pfdset, int idx, int fd, > - fd_cb rcb, fd_cb wcb, void *dat) > -{ > - struct fdentry *pfdentry = &pfdset->fd[idx]; > - struct pollfd *pfd = &pfdset->rwfds[idx]; > - > - pfdentry->fd = fd; > - pfdentry->rcb = rcb; > - pfdentry->wcb = wcb; > - pfdentry->dat = dat; > - > - pfd->fd = fd; > - pfd->events = rcb ? POLLIN : 0; > - pfd->events |= wcb ? POLLOUT : 0; > - pfd->revents = 0; > -} > - > struct fdset * > fdset_init(const char *name) > { > @@ -284,16 +109,20 @@ fdset_init(const char *name) > rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE); > > pthread_mutex_init(&fdset->fd_mutex, NULL); > - pthread_mutex_init(&fdset->fd_polling_mutex, NULL); > > - for (i = 0; i < MAX_FDS; i++) { > + for (i = 0; i < (int)RTE_DIM(fdset->fd); i++) { > fdset->fd[i].fd = -1; > fdset->fd[i].dat = NULL; > } > - fdset->num = 0; > + LIST_INIT(&fdset->fdlist); > > - if (fdset_pipe_init(fdset)) { > - VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name); > + /* > + * Any non-zero value would work (see man epoll_create), > + * but pass MAX_FDS for consistency. > + */ > + fdset->epfd = epoll_create(MAX_FDS); > + if (fdset->epfd < 0) { > + VHOST_FDMAN_LOG(ERR, "failed to create epoll for %s fdset", > name); failed -> Failed like other logs > goto err_free; > } > > @@ -301,7 +130,7 @@ fdset_init(const char *name) > fdset_event_dispatch, fdset)) { > VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch > thread", > fdset->name); > - goto err_pipe; > + goto err_epoll; > } > > if (fdset_insert(fdset)) { > @@ -315,10 +144,9 @@ fdset_init(const char *name) > > err_thread: > fdset->destroy = true; > - fdset_sync(fdset); > rte_thread_join(fdset->tid, &val); > -err_pipe: > - fdset_pipe_uninit(fdset); > +err_epoll: > + close(fdset->epfd); > err_free: > rte_free(fdset); > err_unlock: > @@ -330,78 +158,99 @@ fdset_init(const char *name) > /** > * Register the fd in the fdset with read/write handler and context. > */ > -static int > -fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void > *dat) > +int > +fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) > { > - int i; > + struct fdentry *pfdentry; > + struct epoll_event ev; > > if (pfdset == NULL || fd == -1) > return -1; > > pthread_mutex_lock(&pfdset->fd_mutex); > - i = pfdset->num < MAX_FDS ? pfdset->num++ : -1; > - if (i == -1) { > - pthread_mutex_lock(&pfdset->fd_polling_mutex); > - fdset_shrink_nolock(pfdset); > - pthread_mutex_unlock(&pfdset->fd_polling_mutex); > - i = pfdset->num < MAX_FDS ? pfdset->num++ : -1; > - if (i == -1) { > - pthread_mutex_unlock(&pfdset->fd_mutex); > - return -2; > - } > + if (pfdset->next_free_idx >= (int)RTE_DIM(pfdset->fd)) { > + pthread_mutex_unlock(&pfdset->fd_mutex); > + return -2; > } > > - fdset_add_fd(pfdset, i, fd, rcb, wcb, dat); > + pfdentry = &pfdset->fd[pfdset->next_free_idx]; > + pfdentry->fd = fd; > + pfdentry->rcb = rcb; > + pfdentry->wcb = wcb; > + pfdentry->dat = dat; > + > + LIST_INSERT_HEAD(&pfdset->fdlist, pfdentry, next); > + > + /* Find next free slot */ > + pfdset->next_free_idx++; > + for (; pfdset->next_free_idx < (int)RTE_DIM(pfdset->fd); > pfdset->next_free_idx++) { > + if (pfdset->fd[pfdset->next_free_idx].fd != -1) > + continue; > + break; > + } > pthread_mutex_unlock(&pfdset->fd_mutex); > > + ev.events = EPOLLERR; > + ev.events |= rcb ? EPOLLIN : 0; > + ev.events |= wcb ? EPOLLOUT : 0; > + ev.data.fd = fd; > + > + if (epoll_ctl(pfdset->epfd, EPOLL_CTL_ADD, fd, &ev) == -1) > + VHOST_FDMAN_LOG(ERR, "could not add %d fd to %d epfd: %s", > + fd, pfdset->epfd, strerror(errno)); Should not return 0 if this fails ? > + > return 0; > } > > -int > -fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) > +static struct fdentry * > +fdset_find_entry_locked(struct fdset *pfdset, int fd) > { > - int ret; > + struct fdentry *pfdentry; > > - ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat); > - if (ret < 0) > - return ret; > + LIST_FOREACH(pfdentry, &pfdset->fdlist, next) { > + if (pfdentry->fd != fd) > + continue; > + return pfdentry; > + } > > - fdset_sync(pfdset); > + return NULL; > +} > > - return 0; > +static void > +fdset_del_locked(struct fdset *pfdset, struct fdentry *pfdentry) > +{ > + int entry_idx; > + > + if (epoll_ctl(pfdset->epfd, EPOLL_CTL_DEL, pfdentry->fd, NULL) == -1) > + VHOST_FDMAN_LOG(ERR, "could not remove %d fd from %d epfd: > %s", > + pfdentry->fd, pfdset->epfd, strerror(errno)); > + > + pfdentry->fd = -1; > + pfdentry->rcb = pfdentry->wcb = NULL; > + pfdentry->dat = NULL; > + entry_idx = pfdentry - pfdset->fd; > + if (entry_idx < pfdset->next_free_idx) > + pfdset->next_free_idx = entry_idx; > + LIST_REMOVE(pfdentry, next); > } > > -/** > - * Unregister the fd from the fdset. > - * Returns context of a given fd or NULL. > - */ > -void * > +void > fdset_del(struct fdset *pfdset, int fd) > { > - int i; > - void *dat = NULL; > + struct fdentry *pfdentry; > > if (pfdset == NULL || fd == -1) > - return NULL; > + return; > > do { > pthread_mutex_lock(&pfdset->fd_mutex); > - > - i = fdset_find_fd(pfdset, fd); > - if (i != -1 && pfdset->fd[i].busy == 0) { > - /* busy indicates r/wcb is executing! */ > - dat = pfdset->fd[i].dat; > - pfdset->fd[i].fd = -1; > - pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL; > - pfdset->fd[i].dat = NULL; > - i = -1; > + pfdentry = fdset_find_entry_locked(pfdset, fd); > + if (pfdentry != NULL && pfdentry->busy == 0) { > + fdset_del_locked(pfdset, pfdentry); > + pfdentry = NULL; > } > pthread_mutex_unlock(&pfdset->fd_mutex); > - } while (i != -1); > - > - fdset_sync(pfdset); > - > - return dat; > + } while (pfdentry != NULL); > } > > /** > @@ -415,28 +264,22 @@ fdset_del(struct fdset *pfdset, int fd) > int > fdset_try_del(struct fdset *pfdset, int fd) > { > - int i; > + struct fdentry *pfdentry; > > if (pfdset == NULL || fd == -1) > return -2; > > pthread_mutex_lock(&pfdset->fd_mutex); > - i = fdset_find_fd(pfdset, fd); > - if (i != -1 && pfdset->fd[i].busy) { > + pfdentry = fdset_find_entry_locked(pfdset, fd); > + if (pfdentry != NULL && pfdentry->busy != 0) { > pthread_mutex_unlock(&pfdset->fd_mutex); > return -1; > } > > - if (i != -1) { > - pfdset->fd[i].fd = -1; > - pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL; > - pfdset->fd[i].dat = NULL; > - } > + if (pfdentry != NULL) > + fdset_del_locked(pfdset, pfdentry); > > pthread_mutex_unlock(&pfdset->fd_mutex); > - > - fdset_sync(pfdset); > - > return 0; > } > > @@ -453,53 +296,29 @@ static uint32_t > fdset_event_dispatch(void *arg) > { > int i; > - struct pollfd *pfd; > - struct fdentry *pfdentry; > fd_cb rcb, wcb; > void *dat; > int fd, numfds; > int remove1, remove2; > - int need_shrink; > struct fdset *pfdset = arg; > - int val; > > if (pfdset == NULL) > return 0; > > while (1) { > + struct epoll_event events[MAX_FDS]; > + struct fdentry *pfdentry; > > - /* > - * When poll is blocked, other threads might unregister > - * listenfds from and register new listenfds into fdset. > - * When poll returns, the entries for listenfds in the fdset > - * might have been updated. It is ok if there is unwanted call > - * for new listenfds. > - */ > - pthread_mutex_lock(&pfdset->fd_mutex); > - numfds = pfdset->num; > - pthread_mutex_unlock(&pfdset->fd_mutex); > - > - pthread_mutex_lock(&pfdset->fd_polling_mutex); > - val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */); > - pthread_mutex_unlock(&pfdset->fd_polling_mutex); > - if (val < 0) > + numfds = epoll_wait(pfdset->epfd, events, RTE_DIM(events), > 1000); > + if (numfds < 0) > continue; > > - need_shrink = 0; > for (i = 0; i < numfds; i++) { > pthread_mutex_lock(&pfdset->fd_mutex); > > - pfdentry = &pfdset->fd[i]; > - fd = pfdentry->fd; > - pfd = &pfdset->rwfds[i]; > - > - if (fd < 0) { > - need_shrink = 1; > - pthread_mutex_unlock(&pfdset->fd_mutex); > - continue; > - } > - > - if (!pfd->revents) { > + fd = events[i].data.fd; > + pfdentry = fdset_find_entry_locked(pfdset, fd); > + if (pfdentry == NULL) { > pthread_mutex_unlock(&pfdset->fd_mutex); > continue; > } > @@ -513,9 +332,9 @@ fdset_event_dispatch(void *arg) > > pthread_mutex_unlock(&pfdset->fd_mutex); > > - if (rcb && pfd->revents & (POLLIN | FDPOLLERR)) > + if (rcb && events[i].events & (EPOLLIN | EPOLLERR | > EPOLLHUP)) > rcb(fd, dat, &remove1); > - if (wcb && pfd->revents & (POLLOUT | FDPOLLERR)) > + if (wcb && events[i].events & (EPOLLOUT | EPOLLERR | > EPOLLHUP)) > wcb(fd, dat, &remove2); > pfdentry->busy = 0; > /* > @@ -524,23 +343,13 @@ fdset_event_dispatch(void *arg) > * directly. > */ > /* > - * When we are to clean up the fd from fdset, > - * because the fd is closed in the cb, > - * the old fd val could be reused by when creates new > - * listen fd in another thread, we couldn't call > - * fdset_del. > + * A concurrent fdset_del may have been waiting for > the > + * fdentry not to be busy, so we can't call > + * fdset_del_locked(). > */ > - if (remove1 || remove2) { > - pfdentry->fd = -1; > - need_shrink = 1; > - } > + if (remove1 || remove2) > + fdset_del(pfdset, fd); > } > - > - if (need_shrink) > - fdset_shrink(pfdset); > - > - if (pfdset->destroy) > - break; I guess we want to keep the destroy logic Thanks, Chenbo > } > > return 0; > diff --git a/lib/vhost/fd_man.h b/lib/vhost/fd_man.h > index 079fa0155f..6398343a6a 100644 > --- a/lib/vhost/fd_man.h > +++ b/lib/vhost/fd_man.h > @@ -6,7 +6,7 @@ > #define _FD_MAN_H_ > #include <pthread.h> > #include <poll.h> > -#include <stdbool.h> > +#include <sys/queue.h> > > struct fdset; > > @@ -19,8 +19,7 @@ struct fdset *fdset_init(const char *name); > int fdset_add(struct fdset *pfdset, int fd, > fd_cb rcb, fd_cb wcb, void *dat); > > -void *fdset_del(struct fdset *pfdset, int fd); > - > +void fdset_del(struct fdset *pfdset, int fd); > int fdset_try_del(struct fdset *pfdset, int fd); > > #endif > -- > 2.44.0 >