Hi Chenbo,

On 4/28/24 05:22, Chenbo Xia wrote:
Hi Maxime,

On Apr 9, 2024, at 19:48, Maxime Coquelin <maxime.coque...@redhat.com> wrote:

External email: Use caution opening links or attachments


From: David Marchand <david.march...@redhat.com>

Switch to epoll so that the concern over the poll() fd array
is removed.
Add a simple list of used entries and track the next free entry.

epoll() is thread safe, we no more need a synchronization
mechanism and so can remove the notification pipe.

Signed-off-by: David Marchand <david.march...@redhat.com>
Signed-off-by: Maxime Coquelin <maxime.coque...@redhat.com>
---
lib/vhost/fd_man.c | 399 ++++++++++++---------------------------------
lib/vhost/fd_man.h |   5 +-
2 files changed, 106 insertions(+), 298 deletions(-)

diff --git a/lib/vhost/fd_man.c b/lib/vhost/fd_man.c
index 8b47c97d45..a4a2965da1 100644
--- a/lib/vhost/fd_man.c
+++ b/lib/vhost/fd_man.c
@@ -3,9 +3,9 @@
  */

#include <errno.h>
-#include <pthread.h>
#include <stdio.h>
#include <string.h>
+#include <sys/epoll.h>
#include <unistd.h>

#include <rte_common.h>
@@ -21,49 +21,34 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
#define VHOST_FDMAN_LOG(level, ...) \
        RTE_LOG_LINE(level, VHOST_FDMAN, "" __VA_ARGS__)

-#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
-
struct fdentry {
        int fd;         /* -1 indicates this entry is empty */
        fd_cb rcb;      /* callback when this fd is readable. */
        fd_cb wcb;      /* callback when this fd is writeable.*/
        void *dat;      /* fd context */
        int busy;       /* whether this entry is being used in cb. */
+       LIST_ENTRY(fdentry) next;
};

struct fdset {
        char name[RTE_THREAD_NAME_SIZE];
-       struct pollfd rwfds[MAX_FDS];
+       int epfd;
        struct fdentry fd[MAX_FDS];
+       LIST_HEAD(, fdentry) fdlist;
+       int next_free_idx;
        rte_thread_t tid;
        pthread_mutex_t fd_mutex;
-       pthread_mutex_t fd_polling_mutex;
-       int num;        /* current fd number of this fdset */
-
-       union pipefds {
-               struct {
-                       int pipefd[2];
-               };
-               struct {
-                       int readfd;
-                       int writefd;
-               };
-       } u;
-
-       pthread_mutex_t sync_mutex;
-       pthread_cond_t sync_cond;
-       bool sync;
+

Not sure this blank line is intended or not :)

It isn't, removing it in next revision.


        bool destroy;
};

-static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb 
wcb, void *dat);
-static uint32_t fdset_event_dispatch(void *arg);
-
#define MAX_FDSETS 8

static struct fdset *fdsets[MAX_FDSETS];
pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;

+static uint32_t fdset_event_dispatch(void *arg);
+
static struct fdset *
fdset_lookup(const char *name)
{
@@ -96,166 +81,6 @@ fdset_insert(struct fdset *fdset)
        return -1;
}

-static void
-fdset_pipe_read_cb(int readfd, void *dat,
-                  int *remove __rte_unused)
-{
-       char charbuf[16];
-       struct fdset *fdset = dat;
-       int r = read(readfd, charbuf, sizeof(charbuf));
-       /*
-        * Just an optimization, we don't care if read() failed
-        * so ignore explicitly its return value to make the
-        * compiler happy
-        */
-       RTE_SET_USED(r);
-
-       pthread_mutex_lock(&fdset->sync_mutex);
-       fdset->sync = true;
-       pthread_cond_broadcast(&fdset->sync_cond);
-       pthread_mutex_unlock(&fdset->sync_mutex);
-}
-
-static void
-fdset_pipe_uninit(struct fdset *fdset)
-{
-       fdset_del(fdset, fdset->u.readfd);
-       close(fdset->u.readfd);
-       fdset->u.readfd = -1;
-       close(fdset->u.writefd);
-       fdset->u.writefd = -1;
-}
-
-static int
-fdset_pipe_init(struct fdset *fdset)
-{
-       int ret;
-
-       pthread_mutex_init(&fdset->sync_mutex, NULL);
-       pthread_cond_init(&fdset->sync_cond, NULL);
-
-       if (pipe(fdset->u.pipefd) < 0) {
-               VHOST_FDMAN_LOG(ERR,
-                       "failed to create pipe for vhost fdset");
-               return -1;
-       }
-
-       ret = fdset_add_no_sync(fdset, fdset->u.readfd,
-                       fdset_pipe_read_cb, NULL, fdset);
-       if (ret < 0) {
-               VHOST_FDMAN_LOG(ERR,
-                       "failed to add pipe readfd %d into vhost server fdset",
-                       fdset->u.readfd);
-
-               fdset_pipe_uninit(fdset);
-               return -1;
-       }
-
-       return 0;
-}
-
-static void
-fdset_sync(struct fdset *fdset)
-{
-       int ret;
-
-       pthread_mutex_lock(&fdset->sync_mutex);
-
-       fdset->sync = false;
-       ret = write(fdset->u.writefd, "1", 1);
-       if (ret < 0) {
-               VHOST_FDMAN_LOG(ERR,
-                       "Failed to write to notification pipe: %s",
-                       strerror(errno));
-               goto out_unlock;
-       }
-
-       while (!fdset->sync)
-               pthread_cond_wait(&fdset->sync_cond, &fdset->sync_mutex);
-
-out_unlock:
-       pthread_mutex_unlock(&fdset->sync_mutex);
-}
-
-static int
-get_last_valid_idx(struct fdset *pfdset, int last_valid_idx)
-{
-       int i;
-
-       for (i = last_valid_idx; i >= 0 && pfdset->fd[i].fd == -1; i--)
-               ;
-
-       return i;
-}
-
-static void
-fdset_move(struct fdset *pfdset, int dst, int src)
-{
-       pfdset->fd[dst]    = pfdset->fd[src];
-       pfdset->rwfds[dst] = pfdset->rwfds[src];
-}
-
-static void
-fdset_shrink_nolock(struct fdset *pfdset)
-{
-       int i;
-       int last_valid_idx = get_last_valid_idx(pfdset, pfdset->num - 1);
-
-       for (i = 0; i < last_valid_idx; i++) {
-               if (pfdset->fd[i].fd != -1)
-                       continue;
-
-               fdset_move(pfdset, i, last_valid_idx);
-               last_valid_idx = get_last_valid_idx(pfdset, last_valid_idx - 1);
-       }
-       pfdset->num = last_valid_idx + 1;
-}
-
-/*
- * Find deleted fd entries and remove them
- */
-static void
-fdset_shrink(struct fdset *pfdset)
-{
-       pthread_mutex_lock(&pfdset->fd_mutex);
-       fdset_shrink_nolock(pfdset);
-       pthread_mutex_unlock(&pfdset->fd_mutex);
-}
-
-/**
- * Returns the index in the fdset for a given fd.
- * @return
- *   index for the fd, or -1 if fd isn't in the fdset.
- */
-static int
-fdset_find_fd(struct fdset *pfdset, int fd)
-{
-       int i;
-
-       for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
-               ;
-
-       return i == pfdset->num ? -1 : i;
-}
-
-static void
-fdset_add_fd(struct fdset *pfdset, int idx, int fd,
-       fd_cb rcb, fd_cb wcb, void *dat)
-{
-       struct fdentry *pfdentry = &pfdset->fd[idx];
-       struct pollfd *pfd = &pfdset->rwfds[idx];
-
-       pfdentry->fd  = fd;
-       pfdentry->rcb = rcb;
-       pfdentry->wcb = wcb;
-       pfdentry->dat = dat;
-
-       pfd->fd = fd;
-       pfd->events  = rcb ? POLLIN : 0;
-       pfd->events |= wcb ? POLLOUT : 0;
-       pfd->revents = 0;
-}
-
struct fdset *
fdset_init(const char *name)
{
@@ -284,16 +109,20 @@ fdset_init(const char *name)
        rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE);

        pthread_mutex_init(&fdset->fd_mutex, NULL);
-       pthread_mutex_init(&fdset->fd_polling_mutex, NULL);

-       for (i = 0; i < MAX_FDS; i++) {
+       for (i = 0; i < (int)RTE_DIM(fdset->fd); i++) {
                fdset->fd[i].fd = -1;
                fdset->fd[i].dat = NULL;
        }
-       fdset->num = 0;
+       LIST_INIT(&fdset->fdlist);

-       if (fdset_pipe_init(fdset)) {
-               VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name);
+       /*
+        * Any non-zero value would work (see man epoll_create),
+        * but pass MAX_FDS for consistency.
+        */
+       fdset->epfd = epoll_create(MAX_FDS);
+       if (fdset->epfd < 0) {
+               VHOST_FDMAN_LOG(ERR, "failed to create epoll for %s fdset", 
name);

failed -> Failed like other logs

Ack.


                goto err_free;
        }

@@ -301,7 +130,7 @@ fdset_init(const char *name)
                                        fdset_event_dispatch, fdset)) {
                VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch 
thread",
                                fdset->name);
-               goto err_pipe;
+               goto err_epoll;
        }

        if (fdset_insert(fdset)) {
@@ -315,10 +144,9 @@ fdset_init(const char *name)

err_thread:
        fdset->destroy = true;
-       fdset_sync(fdset);
        rte_thread_join(fdset->tid, &val);
-err_pipe:
-       fdset_pipe_uninit(fdset);
+err_epoll:
+       close(fdset->epfd);
err_free:
        rte_free(fdset);
err_unlock:
@@ -330,78 +158,99 @@ fdset_init(const char *name)
/**
  * Register the fd in the fdset with read/write handler and context.
  */
-static int
-fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void 
*dat)
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
{
-       int i;
+       struct fdentry *pfdentry;
+       struct epoll_event ev;

        if (pfdset == NULL || fd == -1)
                return -1;

        pthread_mutex_lock(&pfdset->fd_mutex);
-       i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
-       if (i == -1) {
-               pthread_mutex_lock(&pfdset->fd_polling_mutex);
-               fdset_shrink_nolock(pfdset);
-               pthread_mutex_unlock(&pfdset->fd_polling_mutex);
-               i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
-               if (i == -1) {
-                       pthread_mutex_unlock(&pfdset->fd_mutex);
-                       return -2;
-               }
+       if (pfdset->next_free_idx >= (int)RTE_DIM(pfdset->fd)) {
+               pthread_mutex_unlock(&pfdset->fd_mutex);
+               return -2;
        }

-       fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
+       pfdentry = &pfdset->fd[pfdset->next_free_idx];
+       pfdentry->fd  = fd;
+       pfdentry->rcb = rcb;
+       pfdentry->wcb = wcb;
+       pfdentry->dat = dat;
+
+       LIST_INSERT_HEAD(&pfdset->fdlist, pfdentry, next);
+
+       /* Find next free slot */
+       pfdset->next_free_idx++;
+       for (; pfdset->next_free_idx < (int)RTE_DIM(pfdset->fd); 
pfdset->next_free_idx++) {
+               if (pfdset->fd[pfdset->next_free_idx].fd != -1)
+                       continue;
+               break;
+       }
        pthread_mutex_unlock(&pfdset->fd_mutex);

+       ev.events = EPOLLERR;
+       ev.events |= rcb ? EPOLLIN : 0;
+       ev.events |= wcb ? EPOLLOUT : 0;
+       ev.data.fd = fd;
+
+       if (epoll_ctl(pfdset->epfd, EPOLL_CTL_ADD, fd, &ev) == -1)
+               VHOST_FDMAN_LOG(ERR, "could not add %d fd to %d epfd: %s",
+                       fd, pfdset->epfd, strerror(errno));

Should not return 0 if this fails ?

Right, in upcoming revision I'm improving the error handling with doing
some refactoring for the pdset management.


+
        return 0;
}

-int
-fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+static struct fdentry *
+fdset_find_entry_locked(struct fdset *pfdset, int fd)
{
-       int ret;
+       struct fdentry *pfdentry;

-       ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
-       if (ret < 0)
-               return ret;
+       LIST_FOREACH(pfdentry, &pfdset->fdlist, next) {
+               if (pfdentry->fd != fd)
+                       continue;
+               return pfdentry;
+       }

-       fdset_sync(pfdset);
+       return NULL;
+}

-       return 0;
+static void
+fdset_del_locked(struct fdset *pfdset, struct fdentry *pfdentry)
+{
+       int entry_idx;
+
+       if (epoll_ctl(pfdset->epfd, EPOLL_CTL_DEL, pfdentry->fd, NULL) == -1)
+               VHOST_FDMAN_LOG(ERR, "could not remove %d fd from %d epfd: %s",
+                       pfdentry->fd, pfdset->epfd, strerror(errno));
+
+       pfdentry->fd = -1;
+       pfdentry->rcb = pfdentry->wcb = NULL;
+       pfdentry->dat = NULL;
+       entry_idx = pfdentry - pfdset->fd;
+       if (entry_idx < pfdset->next_free_idx)
+               pfdset->next_free_idx = entry_idx;
+       LIST_REMOVE(pfdentry, next);
}

-/**
- *  Unregister the fd from the fdset.
- *  Returns context of a given fd or NULL.
- */
-void *
+void
fdset_del(struct fdset *pfdset, int fd)
{
-       int i;
-       void *dat = NULL;
+       struct fdentry *pfdentry;

        if (pfdset == NULL || fd == -1)
-               return NULL;
+               return;

        do {
                pthread_mutex_lock(&pfdset->fd_mutex);
-
-               i = fdset_find_fd(pfdset, fd);
-               if (i != -1 && pfdset->fd[i].busy == 0) {
-                       /* busy indicates r/wcb is executing! */
-                       dat = pfdset->fd[i].dat;
-                       pfdset->fd[i].fd = -1;
-                       pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
-                       pfdset->fd[i].dat = NULL;
-                       i = -1;
+               pfdentry = fdset_find_entry_locked(pfdset, fd);
+               if (pfdentry != NULL && pfdentry->busy == 0) {
+                       fdset_del_locked(pfdset, pfdentry);
+                       pfdentry = NULL;
                }
                pthread_mutex_unlock(&pfdset->fd_mutex);
-       } while (i != -1);
-
-       fdset_sync(pfdset);
-
-       return dat;
+       } while (pfdentry != NULL);
}

/**
@@ -415,28 +264,22 @@ fdset_del(struct fdset *pfdset, int fd)
int
fdset_try_del(struct fdset *pfdset, int fd)
{
-       int i;
+       struct fdentry *pfdentry;

        if (pfdset == NULL || fd == -1)
                return -2;

        pthread_mutex_lock(&pfdset->fd_mutex);
-       i = fdset_find_fd(pfdset, fd);
-       if (i != -1 && pfdset->fd[i].busy) {
+       pfdentry = fdset_find_entry_locked(pfdset, fd);
+       if (pfdentry != NULL && pfdentry->busy != 0) {
                pthread_mutex_unlock(&pfdset->fd_mutex);
                return -1;
        }

-       if (i != -1) {
-               pfdset->fd[i].fd = -1;
-               pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
-               pfdset->fd[i].dat = NULL;
-       }
+       if (pfdentry != NULL)
+               fdset_del_locked(pfdset, pfdentry);

        pthread_mutex_unlock(&pfdset->fd_mutex);
-
-       fdset_sync(pfdset);
-
        return 0;
}

@@ -453,53 +296,29 @@ static uint32_t
fdset_event_dispatch(void *arg)
{
        int i;
-       struct pollfd *pfd;
-       struct fdentry *pfdentry;
        fd_cb rcb, wcb;
        void *dat;
        int fd, numfds;
        int remove1, remove2;
-       int need_shrink;
        struct fdset *pfdset = arg;
-       int val;

        if (pfdset == NULL)
                return 0;

        while (1) {
+               struct epoll_event events[MAX_FDS];
+               struct fdentry *pfdentry;

-               /*
-                * When poll is blocked, other threads might unregister
-                * listenfds from and register new listenfds into fdset.
-                * When poll returns, the entries for listenfds in the fdset
-                * might have been updated. It is ok if there is unwanted call
-                * for new listenfds.
-                */
-               pthread_mutex_lock(&pfdset->fd_mutex);
-               numfds = pfdset->num;
-               pthread_mutex_unlock(&pfdset->fd_mutex);
-
-               pthread_mutex_lock(&pfdset->fd_polling_mutex);
-               val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
-               pthread_mutex_unlock(&pfdset->fd_polling_mutex);
-               if (val < 0)
+               numfds = epoll_wait(pfdset->epfd, events, RTE_DIM(events), 
1000);
+               if (numfds < 0)
                        continue;

-               need_shrink = 0;
                for (i = 0; i < numfds; i++) {
                        pthread_mutex_lock(&pfdset->fd_mutex);

-                       pfdentry = &pfdset->fd[i];
-                       fd = pfdentry->fd;
-                       pfd = &pfdset->rwfds[i];
-
-                       if (fd < 0) {
-                               need_shrink = 1;
-                               pthread_mutex_unlock(&pfdset->fd_mutex);
-                               continue;
-                       }
-
-                       if (!pfd->revents) {
+                       fd = events[i].data.fd;
+                       pfdentry = fdset_find_entry_locked(pfdset, fd);
+                       if (pfdentry == NULL) {
                                pthread_mutex_unlock(&pfdset->fd_mutex);
                                continue;
                        }
@@ -513,9 +332,9 @@ fdset_event_dispatch(void *arg)

                        pthread_mutex_unlock(&pfdset->fd_mutex);

-                       if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
+                       if (rcb && events[i].events & (EPOLLIN | EPOLLERR | 
EPOLLHUP))
                                rcb(fd, dat, &remove1);
-                       if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
+                       if (wcb && events[i].events & (EPOLLOUT | EPOLLERR | 
EPOLLHUP))
                                wcb(fd, dat, &remove2);
                        pfdentry->busy = 0;
                        /*
@@ -524,23 +343,13 @@ fdset_event_dispatch(void *arg)
                         * directly.
                         */
                        /*
-                        * When we are to clean up the fd from fdset,
-                        * because the fd is closed in the cb,
-                        * the old fd val could be reused by when creates new
-                        * listen fd in another thread, we couldn't call
-                        * fdset_del.
+                        * A concurrent fdset_del may have been waiting for the
+                        * fdentry not to be busy, so we can't call
+                        * fdset_del_locked().
                         */
-                       if (remove1 || remove2) {
-                               pfdentry->fd = -1;
-                               need_shrink = 1;
-                       }
+                       if (remove1 || remove2)
+                               fdset_del(pfdset, fd);
                }
-
-               if (need_shrink)
-                       fdset_shrink(pfdset);
-
-               if (pfdset->destroy)
-                       break;

I guess we want to keep the destroy logic

Right, good catch!
Adding it back.

Thanks for the review,
Maxime
Thanks,
Chenbo

        }

        return 0;
diff --git a/lib/vhost/fd_man.h b/lib/vhost/fd_man.h
index 079fa0155f..6398343a6a 100644
--- a/lib/vhost/fd_man.h
+++ b/lib/vhost/fd_man.h
@@ -6,7 +6,7 @@
#define _FD_MAN_H_
#include <pthread.h>
#include <poll.h>
-#include <stdbool.h>
+#include <sys/queue.h>

struct fdset;

@@ -19,8 +19,7 @@ struct fdset *fdset_init(const char *name);
int fdset_add(struct fdset *pfdset, int fd,
        fd_cb rcb, fd_cb wcb, void *dat);

-void *fdset_del(struct fdset *pfdset, int fd);
-
+void fdset_del(struct fdset *pfdset, int fd);
int fdset_try_del(struct fdset *pfdset, int fd);

#endif
--
2.44.0



Reply via email to