This patch heavily reworks fdset initialization:
 - fdsets are now dynamically allocated by the FD manager
 - the event dispatcher is now created by the FD manager
 - struct fdset is now opaque to VDUSE and Vhost

Signed-off-by: Maxime Coquelin <maxime.coque...@redhat.com>
---
 lib/vhost/fd_man.c      | 177 +++++++++++--
 lib/vhost/fd_man.c.orig | 538 ++++++++++++++++++++++++++++++++++++++++
 lib/vhost/fd_man.h      |  39 +--
 lib/vhost/socket.c      |  24 +-
 lib/vhost/vduse.c       |  29 +--
 5 files changed, 715 insertions(+), 92 deletions(-)
 create mode 100644 lib/vhost/fd_man.c.orig

diff --git a/lib/vhost/fd_man.c b/lib/vhost/fd_man.c
index 0ae481b785..8b47c97d45 100644
--- a/lib/vhost/fd_man.c
+++ b/lib/vhost/fd_man.c
@@ -3,12 +3,16 @@
  */
 
 #include <errno.h>
+#include <pthread.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 
 #include <rte_common.h>
 #include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_string_fns.h>
+#include <rte_thread.h>
 
 #include "fd_man.h"
 
@@ -19,6 +23,79 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
 
 #define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
 
+struct fdentry {
+       int fd;         /* -1 indicates this entry is empty */
+       fd_cb rcb;      /* callback when this fd is readable. */
+       fd_cb wcb;      /* callback when this fd is writeable.*/
+       void *dat;      /* fd context */
+       int busy;       /* whether this entry is being used in cb. */
+};
+
+struct fdset {
+       char name[RTE_THREAD_NAME_SIZE];
+       struct pollfd rwfds[MAX_FDS];
+       struct fdentry fd[MAX_FDS];
+       rte_thread_t tid;
+       pthread_mutex_t fd_mutex;
+       pthread_mutex_t fd_polling_mutex;
+       int num;        /* current fd number of this fdset */
+
+       union pipefds {
+               struct {
+                       int pipefd[2];
+               };
+               struct {
+                       int readfd;
+                       int writefd;
+               };
+       } u;
+
+       pthread_mutex_t sync_mutex;
+       pthread_cond_t sync_cond;
+       bool sync;
+       bool destroy;
+};
+
+static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb 
wcb, void *dat);
+static uint32_t fdset_event_dispatch(void *arg);
+
+#define MAX_FDSETS 8
+
+static struct fdset *fdsets[MAX_FDSETS];
+pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct fdset *
+fdset_lookup(const char *name)
+{
+       int i;
+
+       for (i = 0; i < MAX_FDSETS; i++) {
+               struct fdset *fdset = fdsets[i];
+               if (fdset == NULL)
+                       continue;
+
+               if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE))
+                       return fdset;
+       }
+
+       return NULL;
+}
+
+static int
+fdset_insert(struct fdset *fdset)
+{
+       int i;
+
+       for (i = 0; i < MAX_FDSETS; i++) {
+               if (fdsets[i] == NULL) {
+                       fdsets[i] = fdset;
+                       return 0;
+               }
+       }
+
+       return -1;
+}
+
 static void
 fdset_pipe_read_cb(int readfd, void *dat,
                   int *remove __rte_unused)
@@ -63,7 +140,7 @@ fdset_pipe_init(struct fdset *fdset)
                return -1;
        }
 
-       ret = fdset_add(fdset, fdset->u.readfd,
+       ret = fdset_add_no_sync(fdset, fdset->u.readfd,
                        fdset_pipe_read_cb, NULL, fdset);
        if (ret < 0) {
                VHOST_FDMAN_LOG(ERR,
@@ -179,37 +256,82 @@ fdset_add_fd(struct fdset *pfdset, int idx, int fd,
        pfd->revents = 0;
 }
 
-void
-fdset_uninit(struct fdset *pfdset)
-{
-       fdset_pipe_uninit(pfdset);
-}
-
-int
-fdset_init(struct fdset *pfdset)
+struct fdset *
+fdset_init(const char *name)
 {
+       struct fdset *fdset;
+       uint32_t val;
        int i;
 
-       if (pfdset == NULL)
-               return -1;
+       if (name == NULL) {
+               VHOST_FDMAN_LOG(ERR, "Invalid name");
+               goto err;
+       }
 
-       pthread_mutex_init(&pfdset->fd_mutex, NULL);
-       pthread_mutex_init(&pfdset->fd_polling_mutex, NULL);
+       pthread_mutex_lock(&fdsets_mutex);
+       fdset = fdset_lookup(name);
+       if (fdset) {
+               pthread_mutex_unlock(&fdsets_mutex);
+               return fdset;
+       }
+
+       fdset = rte_zmalloc(NULL, sizeof(*fdset), 0);
+       if (!fdset) {
+               VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name);
+               goto err_unlock;
+       }
+
+       rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE);
+
+       pthread_mutex_init(&fdset->fd_mutex, NULL);
+       pthread_mutex_init(&fdset->fd_polling_mutex, NULL);
 
        for (i = 0; i < MAX_FDS; i++) {
-               pfdset->fd[i].fd = -1;
-               pfdset->fd[i].dat = NULL;
+               fdset->fd[i].fd = -1;
+               fdset->fd[i].dat = NULL;
        }
-       pfdset->num = 0;
+       fdset->num = 0;
 
-       return fdset_pipe_init(pfdset);
+       if (fdset_pipe_init(fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name);
+               goto err_free;
+       }
+
+       if (rte_thread_create_internal_control(&fdset->tid, fdset->name,
+                                       fdset_event_dispatch, fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch 
thread",
+                               fdset->name);
+               goto err_pipe;
+       }
+
+       if (fdset_insert(fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name);
+               goto err_thread;
+       }
+
+       pthread_mutex_unlock(&fdsets_mutex);
+
+       return fdset;
+
+err_thread:
+       fdset->destroy = true;
+       fdset_sync(fdset);
+       rte_thread_join(fdset->tid, &val);
+err_pipe:
+       fdset_pipe_uninit(fdset);
+err_free:
+       rte_free(fdset);
+err_unlock:
+       pthread_mutex_unlock(&fdsets_mutex);
+err:
+       return NULL;
 }
 
 /**
  * Register the fd in the fdset with read/write handler and context.
  */
-int
-fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+static int
+fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void 
*dat)
 {
        int i;
 
@@ -232,6 +354,18 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb 
wcb, void *dat)
        fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
        pthread_mutex_unlock(&pfdset->fd_mutex);
 
+       return 0;
+}
+
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+       int ret;
+
+       ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
+       if (ret < 0)
+               return ret;
+
        fdset_sync(pfdset);
 
        return 0;
@@ -315,7 +449,7 @@ fdset_try_del(struct fdset *pfdset, int fd)
  * will wait until the flag is reset to zero(which indicates the callback is
  * finished), then it could free the context after fdset_del.
  */
-uint32_t
+static uint32_t
 fdset_event_dispatch(void *arg)
 {
        int i;
@@ -404,6 +538,9 @@ fdset_event_dispatch(void *arg)
 
                if (need_shrink)
                        fdset_shrink(pfdset);
+
+               if (pfdset->destroy)
+                       break;
        }
 
        return 0;
diff --git a/lib/vhost/fd_man.c.orig b/lib/vhost/fd_man.c.orig
new file mode 100644
index 0000000000..c0149fbf4e
--- /dev/null
+++ b/lib/vhost/fd_man.c.orig
@@ -0,0 +1,538 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ */
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_thread.h>
+
+#include "fd_man.h"
+
+RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
+#define RTE_LOGTYPE_VHOST_FDMAN vhost_fdset_logtype
+#define VHOST_FDMAN_LOG(level, ...) \
+       RTE_LOG_LINE(level, VHOST_FDMAN, "" __VA_ARGS__)
+
+#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+
+struct fdentry {
+       int fd;         /* -1 indicates this entry is empty */
+       fd_cb rcb;      /* callback when this fd is readable. */
+       fd_cb wcb;      /* callback when this fd is writeable.*/
+       void *dat;      /* fd context */
+       int busy;       /* whether this entry is being used in cb. */
+};
+
+struct fdset {
+       char name[RTE_THREAD_NAME_SIZE];
+       struct pollfd rwfds[MAX_FDS];
+       struct fdentry fd[MAX_FDS];
+       rte_thread_t tid;
+       pthread_mutex_t fd_mutex;
+       pthread_mutex_t fd_polling_mutex;
+       int num;        /* current fd number of this fdset */
+
+       int sync_fd;
+       pthread_mutex_t sync_mutex;
+       pthread_cond_t sync_cond;
+       bool sync;
+
+       bool destroy;
+};
+
+static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb 
wcb, void *dat);
+static uint32_t fdset_event_dispatch(void *arg);
+
+#define MAX_FDSETS 8
+
+static struct fdset *fdsets[MAX_FDSETS];
+pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct fdset *
+fdset_lookup(const char *name)
+{
+       int i;
+
+       for (i = 0; i < MAX_FDSETS; i++) {
+               struct fdset *fdset = fdsets[i];
+               if (fdset == NULL)
+                       continue;
+
+               if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE))
+                       return fdset;
+       }
+
+       return NULL;
+}
+
+static int
+fdset_insert(struct fdset *fdset)
+{
+       int i;
+
+       for (i = 0; i < MAX_FDSETS; i++) {
+               if (fdsets[i] == NULL) {
+                       fdsets[i] = fdset;
+                       return 0;
+               }
+       }
+
+       return -1;
+}
+
+static void
+fdset_sync_read_cb(int sync_fd, void *dat, int *remove __rte_unused)
+{
+       eventfd_t val;
+       struct fdset *fdset = dat;
+       int r = eventfd_read(sync_fd, &val);
+       /*
+        * Just an optimization, we don't care if read() failed
+        * so ignore explicitly its return value to make the
+        * compiler happy
+        */
+       RTE_SET_USED(r);
+
+       pthread_mutex_lock(&fdset->sync_mutex);
+       fdset->sync = true;
+       pthread_cond_broadcast(&fdset->sync_cond);
+       pthread_mutex_unlock(&fdset->sync_mutex);
+}
+
+static void
+fdset_sync_uninit(struct fdset *fdset)
+{
+       fdset_del(fdset, fdset->sync_fd);
+       close(fdset->sync_fd);
+       fdset->sync_fd = -1;
+}
+
+static int
+fdset_sync_init(struct fdset *fdset)
+{
+       int ret;
+
+       pthread_mutex_init(&fdset->sync_mutex, NULL);
+       pthread_cond_init(&fdset->sync_cond, NULL);
+
+       fdset->sync_fd = eventfd(0, 0);
+       if (fdset->sync_fd < 0) {
+               VHOST_FDMAN_LOG(ERR, "failed to create eventfd for %s fdset", 
fdset->name);
+               return -1;
+       }
+
+<<<<<<< HEAD
+       ret = fdset_add_no_sync(fdset, fdset->u.readfd,
+                       fdset_pipe_read_cb, NULL, fdset);
+=======
+       ret = fdset_add(fdset, fdset->sync_fd, fdset_sync_read_cb, NULL, fdset);
+>>>>>>> 3474bf77e2 (vhost: convert fdset sync to eventfd)
+       if (ret < 0) {
+               VHOST_FDMAN_LOG(ERR, "failed to add eventfd %d to %s fdset",
+                       fdset->sync_fd, fdset->name);
+
+               fdset_sync_uninit(fdset);
+               return -1;
+       }
+
+       return 0;
+}
+
+static void
+fdset_sync(struct fdset *fdset)
+{
+       int ret;
+
+       pthread_mutex_lock(&fdset->sync_mutex);
+
+       fdset->sync = false;
+       ret = eventfd_write(fdset->sync_fd, (eventfd_t)1);
+       if (ret < 0) {
+               VHOST_FDMAN_LOG(ERR, "Failed to write sync eventfd for %s 
fdset: %s",
+                       fdset->name, strerror(errno));
+               goto out_unlock;
+       }
+
+       while (!fdset->sync)
+               pthread_cond_wait(&fdset->sync_cond, &fdset->sync_mutex);
+
+out_unlock:
+       pthread_mutex_unlock(&fdset->sync_mutex);
+}
+
+static int
+get_last_valid_idx(struct fdset *pfdset, int last_valid_idx)
+{
+       int i;
+
+       for (i = last_valid_idx; i >= 0 && pfdset->fd[i].fd == -1; i--)
+               ;
+
+       return i;
+}
+
+static void
+fdset_move(struct fdset *pfdset, int dst, int src)
+{
+       pfdset->fd[dst]    = pfdset->fd[src];
+       pfdset->rwfds[dst] = pfdset->rwfds[src];
+}
+
+static void
+fdset_shrink_nolock(struct fdset *pfdset)
+{
+       int i;
+       int last_valid_idx = get_last_valid_idx(pfdset, pfdset->num - 1);
+
+       for (i = 0; i < last_valid_idx; i++) {
+               if (pfdset->fd[i].fd != -1)
+                       continue;
+
+               fdset_move(pfdset, i, last_valid_idx);
+               last_valid_idx = get_last_valid_idx(pfdset, last_valid_idx - 1);
+       }
+       pfdset->num = last_valid_idx + 1;
+}
+
+/*
+ * Find deleted fd entries and remove them
+ */
+static void
+fdset_shrink(struct fdset *pfdset)
+{
+       pthread_mutex_lock(&pfdset->fd_mutex);
+       fdset_shrink_nolock(pfdset);
+       pthread_mutex_unlock(&pfdset->fd_mutex);
+}
+
+/**
+ * Returns the index in the fdset for a given fd.
+ * @return
+ *   index for the fd, or -1 if fd isn't in the fdset.
+ */
+static int
+fdset_find_fd(struct fdset *pfdset, int fd)
+{
+       int i;
+
+       for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
+               ;
+
+       return i == pfdset->num ? -1 : i;
+}
+
+static void
+fdset_add_fd(struct fdset *pfdset, int idx, int fd,
+       fd_cb rcb, fd_cb wcb, void *dat)
+{
+       struct fdentry *pfdentry = &pfdset->fd[idx];
+       struct pollfd *pfd = &pfdset->rwfds[idx];
+
+       pfdentry->fd  = fd;
+       pfdentry->rcb = rcb;
+       pfdentry->wcb = wcb;
+       pfdentry->dat = dat;
+
+       pfd->fd = fd;
+       pfd->events  = rcb ? POLLIN : 0;
+       pfd->events |= wcb ? POLLOUT : 0;
+       pfd->revents = 0;
+}
+
+struct fdset *
+fdset_init(const char *name)
+{
+       struct fdset *fdset;
+       uint32_t val;
+       int i;
+
+       if (name == NULL) {
+               VHOST_FDMAN_LOG(ERR, "Invalid name");
+               goto err;
+       }
+
+       pthread_mutex_lock(&fdsets_mutex);
+       fdset = fdset_lookup(name);
+       if (fdset) {
+               pthread_mutex_unlock(&fdsets_mutex);
+               return fdset;
+       }
+
+       fdset = rte_zmalloc(NULL, sizeof(*fdset), 0);
+       if (!fdset) {
+               VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name);
+               goto err_unlock;
+       }
+
+       strncpy(fdset->name, name, RTE_THREAD_NAME_SIZE - 1);
+
+       pthread_mutex_init(&fdset->fd_mutex, NULL);
+       pthread_mutex_init(&fdset->fd_polling_mutex, NULL);
+
+       for (i = 0; i < MAX_FDS; i++) {
+               fdset->fd[i].fd = -1;
+               fdset->fd[i].dat = NULL;
+       }
+       fdset->num = 0;
+
+       if (fdset_sync_init(fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to init sync for %s", name);
+               goto err_free;
+       }
+
+       if (rte_thread_create_internal_control(&fdset->tid, fdset->name,
+                                       fdset_event_dispatch, fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch 
thread",
+                               fdset->name);
+               goto err_sync;
+       }
+
+       if (fdset_insert(fdset)) {
+               VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name);
+               goto err_thread;
+       }
+
+       pthread_mutex_unlock(&fdsets_mutex);
+
+       return fdset;
+
+err_thread:
+       fdset->destroy = true;
+       fdset_sync(fdset);
+       rte_thread_join(fdset->tid, &val);
+err_sync:
+       fdset_sync_uninit(fdset);
+err_free:
+       rte_free(fdset);
+err_unlock:
+       pthread_mutex_unlock(&fdsets_mutex);
+err:
+       return NULL;
+}
+
+/**
+ * Register the fd in the fdset with read/write handler and context.
+ */
+static int
+fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void 
*dat)
+{
+       int i;
+
+       if (pfdset == NULL || fd == -1)
+               return -1;
+
+       pthread_mutex_lock(&pfdset->fd_mutex);
+       i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+       if (i == -1) {
+               pthread_mutex_lock(&pfdset->fd_polling_mutex);
+               fdset_shrink_nolock(pfdset);
+               pthread_mutex_unlock(&pfdset->fd_polling_mutex);
+               i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+               if (i == -1) {
+                       pthread_mutex_unlock(&pfdset->fd_mutex);
+                       return -2;
+               }
+       }
+
+       fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
+       pthread_mutex_unlock(&pfdset->fd_mutex);
+
+       return 0;
+}
+
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+       int ret;
+
+       ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
+       if (ret < 0)
+               return ret;
+
+       fdset_sync(pfdset);
+
+       return 0;
+}
+
+/**
+ *  Unregister the fd from the fdset.
+ *  Returns context of a given fd or NULL.
+ */
+void *
+fdset_del(struct fdset *pfdset, int fd)
+{
+       int i;
+       void *dat = NULL;
+
+       if (pfdset == NULL || fd == -1)
+               return NULL;
+
+       do {
+               pthread_mutex_lock(&pfdset->fd_mutex);
+
+               i = fdset_find_fd(pfdset, fd);
+               if (i != -1 && pfdset->fd[i].busy == 0) {
+                       /* busy indicates r/wcb is executing! */
+                       dat = pfdset->fd[i].dat;
+                       pfdset->fd[i].fd = -1;
+                       pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
+                       pfdset->fd[i].dat = NULL;
+                       i = -1;
+               }
+               pthread_mutex_unlock(&pfdset->fd_mutex);
+       } while (i != -1);
+
+       fdset_sync(pfdset);
+
+       return dat;
+}
+
+/**
+ *  Unregister the fd from the fdset.
+ *
+ *  If parameters are invalid, return directly -2.
+ *  And check whether fd is busy, if yes, return -1.
+ *  Otherwise, try to delete the fd from fdset and
+ *  return true.
+ */
+int
+fdset_try_del(struct fdset *pfdset, int fd)
+{
+       int i;
+
+       if (pfdset == NULL || fd == -1)
+               return -2;
+
+       pthread_mutex_lock(&pfdset->fd_mutex);
+       i = fdset_find_fd(pfdset, fd);
+       if (i != -1 && pfdset->fd[i].busy) {
+               pthread_mutex_unlock(&pfdset->fd_mutex);
+               return -1;
+       }
+
+       if (i != -1) {
+               pfdset->fd[i].fd = -1;
+               pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
+               pfdset->fd[i].dat = NULL;
+       }
+
+       pthread_mutex_unlock(&pfdset->fd_mutex);
+
+       fdset_sync(pfdset);
+
+       return 0;
+}
+
+/**
+ * This functions runs in infinite blocking loop until there is no fd in
+ * pfdset. It calls corresponding r/w handler if there is event on the fd.
+ *
+ * Before the callback is called, we set the flag to busy status; If other
+ * thread(now rte_vhost_driver_unregister) calls fdset_del concurrently, it
+ * will wait until the flag is reset to zero(which indicates the callback is
+ * finished), then it could free the context after fdset_del.
+ */
+static uint32_t
+fdset_event_dispatch(void *arg)
+{
+       int i;
+       struct pollfd *pfd;
+       struct fdentry *pfdentry;
+       fd_cb rcb, wcb;
+       void *dat;
+       int fd, numfds;
+       int remove1, remove2;
+       int need_shrink;
+       struct fdset *pfdset = arg;
+       int val;
+
+       if (pfdset == NULL)
+               return 0;
+
+       while (1) {
+
+               /*
+                * When poll is blocked, other threads might unregister
+                * listenfds from and register new listenfds into fdset.
+                * When poll returns, the entries for listenfds in the fdset
+                * might have been updated. It is ok if there is unwanted call
+                * for new listenfds.
+                */
+               pthread_mutex_lock(&pfdset->fd_mutex);
+               numfds = pfdset->num;
+               pthread_mutex_unlock(&pfdset->fd_mutex);
+
+               pthread_mutex_lock(&pfdset->fd_polling_mutex);
+               val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
+               pthread_mutex_unlock(&pfdset->fd_polling_mutex);
+               if (val < 0)
+                       continue;
+
+               need_shrink = 0;
+               for (i = 0; i < numfds; i++) {
+                       pthread_mutex_lock(&pfdset->fd_mutex);
+
+                       pfdentry = &pfdset->fd[i];
+                       fd = pfdentry->fd;
+                       pfd = &pfdset->rwfds[i];
+
+                       if (fd < 0) {
+                               need_shrink = 1;
+                               pthread_mutex_unlock(&pfdset->fd_mutex);
+                               continue;
+                       }
+
+                       if (!pfd->revents) {
+                               pthread_mutex_unlock(&pfdset->fd_mutex);
+                               continue;
+                       }
+
+                       remove1 = remove2 = 0;
+
+                       rcb = pfdentry->rcb;
+                       wcb = pfdentry->wcb;
+                       dat = pfdentry->dat;
+                       pfdentry->busy = 1;
+
+                       pthread_mutex_unlock(&pfdset->fd_mutex);
+
+                       if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
+                               rcb(fd, dat, &remove1);
+                       if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
+                               wcb(fd, dat, &remove2);
+                       pfdentry->busy = 0;
+                       /*
+                        * fdset_del needs to check busy flag.
+                        * We don't allow fdset_del to be called in callback
+                        * directly.
+                        */
+                       /*
+                        * When we are to clean up the fd from fdset,
+                        * because the fd is closed in the cb,
+                        * the old fd val could be reused by when creates new
+                        * listen fd in another thread, we couldn't call
+                        * fdset_del.
+                        */
+                       if (remove1 || remove2) {
+                               pfdentry->fd = -1;
+                               need_shrink = 1;
+                       }
+               }
+
+               if (need_shrink)
+                       fdset_shrink(pfdset);
+
+               if (pfdset->destroy)
+                       break;
+       }
+
+       return 0;
+}
diff --git a/lib/vhost/fd_man.h b/lib/vhost/fd_man.h
index c18e3a435c..079fa0155f 100644
--- a/lib/vhost/fd_man.h
+++ b/lib/vhost/fd_man.h
@@ -8,50 +8,19 @@
 #include <poll.h>
 #include <stdbool.h>
 
+struct fdset;
+
 #define MAX_FDS 1024
 
 typedef void (*fd_cb)(int fd, void *dat, int *remove);
 
-struct fdentry {
-       int fd;         /* -1 indicates this entry is empty */
-       fd_cb rcb;      /* callback when this fd is readable. */
-       fd_cb wcb;      /* callback when this fd is writeable.*/
-       void *dat;      /* fd context */
-       int busy;       /* whether this entry is being used in cb. */
-};
-
-struct fdset {
-       struct pollfd rwfds[MAX_FDS];
-       struct fdentry fd[MAX_FDS];
-       pthread_mutex_t fd_mutex;
-       pthread_mutex_t fd_polling_mutex;
-       int num;        /* current fd number of this fdset */
-
-       union pipefds {
-               struct {
-                       int pipefd[2];
-               };
-               struct {
-                       int readfd;
-                       int writefd;
-               };
-       } u;
-
-       pthread_mutex_t sync_mutex;
-       pthread_cond_t sync_cond;
-       bool sync;
-};
-
-void fdset_uninit(struct fdset *pfdset);
-
-int fdset_init(struct fdset *pfdset);
+struct fdset *fdset_init(const char *name);
 
 int fdset_add(struct fdset *pfdset, int fd,
        fd_cb rcb, fd_cb wcb, void *dat);
 
 void *fdset_del(struct fdset *pfdset, int fd);
-int fdset_try_del(struct fdset *pfdset, int fd);
 
-uint32_t fdset_event_dispatch(void *arg);
+int fdset_try_del(struct fdset *pfdset, int fd);
 
 #endif
diff --git a/lib/vhost/socket.c b/lib/vhost/socket.c
index 2f93d48c31..9eebc63479 100644
--- a/lib/vhost/socket.c
+++ b/lib/vhost/socket.c
@@ -76,7 +76,7 @@ struct vhost_user_connection {
 #define MAX_VHOST_SOCKET 1024
 struct vhost_user {
        struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
-       struct fdset fdset;
+       struct fdset *fdset;
        int vsocket_cnt;
        pthread_mutex_t mutex;
 };
@@ -261,7 +261,7 @@ vhost_user_add_connection(int fd, struct vhost_user_socket 
*vsocket)
        conn->connfd = fd;
        conn->vsocket = vsocket;
        conn->vid = vid;
-       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
+       ret = fdset_add(vhost_user.fdset, fd, vhost_user_read_cb,
                        NULL, conn);
        if (ret < 0) {
                VHOST_CONFIG_LOG(vsocket->path, ERR,
@@ -394,7 +394,7 @@ vhost_user_start_server(struct vhost_user_socket *vsocket)
        if (ret < 0)
                goto err;
 
-       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
+       ret = fdset_add(vhost_user.fdset, fd, vhost_user_server_new_connection,
                  NULL, vsocket);
        if (ret < 0) {
                VHOST_CONFIG_LOG(path, ERR, "failed to add listen fd %d to 
vhost server fdset",
@@ -1079,7 +1079,7 @@ rte_vhost_driver_unregister(const char *path)
                         * mutex lock, and try again since the r/wcb
                         * may use the mutex lock.
                         */
-                       if (fdset_try_del(&vhost_user.fdset, 
vsocket->socket_fd) == -1) {
+                       if (fdset_try_del(vhost_user.fdset, vsocket->socket_fd) 
== -1) {
                                pthread_mutex_unlock(&vhost_user.mutex);
                                goto again;
                        }
@@ -1099,7 +1099,7 @@ rte_vhost_driver_unregister(const char *path)
                         * try again since the r/wcb may use the
                         * conn_mutex and mutex locks.
                         */
-                       if (fdset_try_del(&vhost_user.fdset,
+                       if (fdset_try_del(vhost_user.fdset,
                                          conn->connfd) == -1) {
                                pthread_mutex_unlock(&vsocket->conn_mutex);
                                pthread_mutex_unlock(&vhost_user.mutex);
@@ -1167,7 +1167,6 @@ int
 rte_vhost_driver_start(const char *path)
 {
        struct vhost_user_socket *vsocket;
-       static rte_thread_t fdset_tid;
 
        pthread_mutex_lock(&vhost_user.mutex);
        vsocket = find_vhost_user_socket(path);
@@ -1179,19 +1178,12 @@ rte_vhost_driver_start(const char *path)
        if (vsocket->is_vduse)
                return vduse_device_create(path, 
vsocket->net_compliant_ol_flags);
 
-       if (fdset_tid.opaque_id == 0) {
-               if (fdset_init(&vhost_user.fdset) < 0) {
+       if (vhost_user.fdset == NULL) {
+               vhost_user.fdset = fdset_init("vhost-evt");
+               if (vhost_user.fdset == NULL) {
                        VHOST_CONFIG_LOG(path, ERR, "Failed to init Vhost-user 
fdset");
                        return -1;
                }
-
-               int ret = rte_thread_create_internal_control(&fdset_tid,
-                               "vhost-evt", fdset_event_dispatch, 
&vhost_user.fdset);
-               if (ret != 0) {
-                       VHOST_CONFIG_LOG(path, ERR, "failed to create fdset 
handling thread");
-                       fdset_uninit(&vhost_user.fdset);
-                       return -1;
-               }
        }
 
        if (vsocket->is_server)
diff --git a/lib/vhost/vduse.c b/lib/vhost/vduse.c
index 257285a89f..ef2573bdf0 100644
--- a/lib/vhost/vduse.c
+++ b/lib/vhost/vduse.c
@@ -28,13 +28,11 @@
 #define VDUSE_CTRL_PATH "/dev/vduse/control"
 
 struct vduse {
-       struct fdset fdset;
+       struct fdset *fdset;
 };
 
 static struct vduse vduse;
 
-static bool vduse_events_thread;
-
 static const char * const vduse_reqs_str[] = {
        "VDUSE_GET_VQ_STATE",
        "VDUSE_SET_STATUS",
@@ -215,7 +213,7 @@ vduse_vring_setup(struct virtio_net *dev, unsigned int 
index)
        }
 
        if (vq == dev->cvq) {
-               ret = fdset_add(&vduse.fdset, vq->kickfd, 
vduse_control_queue_event, NULL, dev);
+               ret = fdset_add(vduse.fdset, vq->kickfd, 
vduse_control_queue_event, NULL, dev);
                if (ret) {
                        VHOST_CONFIG_LOG(dev->ifname, ERR,
                                        "Failed to setup kickfd handler for VQ 
%u: %s",
@@ -238,7 +236,7 @@ vduse_vring_cleanup(struct virtio_net *dev, unsigned int 
index)
        int ret;
 
        if (vq == dev->cvq && vq->kickfd >= 0)
-               fdset_del(&vduse.fdset, vq->kickfd);
+               fdset_del(vduse.fdset, vq->kickfd);
 
        vq_efd.index = index;
        vq_efd.fd = VDUSE_EVENTFD_DEASSIGN;
@@ -413,7 +411,6 @@ int
 vduse_device_create(const char *path, bool compliant_ol_flags)
 {
        int control_fd, dev_fd, vid, ret;
-       rte_thread_t fdset_tid;
        uint32_t i, max_queue_pairs, total_queues;
        struct virtio_net *dev;
        struct virtio_net_config vnet_config = {{ 0 }};
@@ -422,22 +419,12 @@ vduse_device_create(const char *path, bool 
compliant_ol_flags)
        struct vduse_dev_config *dev_config = NULL;
        const char *name = path + strlen("/dev/vduse/");
 
-       /* If first device, create events dispatcher thread */
-       if (vduse_events_thread == false) {
-               if (fdset_init(&vduse.fdset) < 0) {
+       if (vduse.fdset == NULL) {
+               vduse.fdset = fdset_init("vduse-evt");
+               if (vduse.fdset == NULL) {
                        VHOST_CONFIG_LOG(path, ERR, "Failed to init VDUSE 
fdset");
                        return -1;
                }
-
-               ret = rte_thread_create_internal_control(&fdset_tid, 
"vduse-evt",
-                               fdset_event_dispatch, &vduse.fdset);
-               if (ret != 0) {
-                       VHOST_CONFIG_LOG(path, ERR, "failed to create vduse 
fdset handling thread");
-                       fdset_uninit(&vduse.fdset);
-                       return -1;
-               }
-
-               vduse_events_thread = true;
        }
 
        control_fd = open(VDUSE_CTRL_PATH, O_RDWR);
@@ -555,7 +542,7 @@ vduse_device_create(const char *path, bool 
compliant_ol_flags)
 
        dev->cvq = dev->virtqueue[max_queue_pairs * 2];
 
-       ret = fdset_add(&vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, 
NULL, dev);
+       ret = fdset_add(vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, 
NULL, dev);
        if (ret) {
                VHOST_CONFIG_LOG(name, ERR, "Failed to add fd %d to vduse 
fdset",
                                dev->vduse_dev_fd);
@@ -602,7 +589,7 @@ vduse_device_destroy(const char *path)
 
        vduse_device_stop(dev);
 
-       fdset_del(&vduse.fdset, dev->vduse_dev_fd);
+       fdset_del(vduse.fdset, dev->vduse_dev_fd);
 
        if (dev->vduse_dev_fd >= 0) {
                close(dev->vduse_dev_fd);
-- 
2.43.2

Reply via email to