The socket.c file serves two purposes:
1. librte_vhost public API entry points, e.g. rte_vhost_driver_register().
2. AF_UNIX socket management.

Move AF_UNIX socket code into trans_af_unix.c so that socket.c only
handles the librte_vhost public API entry points.  This will make it
possible to support other transports besides AF_UNIX.

This patch is a preparatory step that simply moves code from socket.c to
trans_af_unix.c unmodified, besides dropping 'static' qualifiers where
necessary because socket.c now calls into trans_af_unix.c.

A lot of socket.c state is exposed in vhost.h but this is a temporary
measure and will be cleaned up in later patches.  By simply moving code
unmodified in this patch it will be easier to review the actual
refactoring that follows.

Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
---
 lib/librte_vhost/vhost.h         |  65 +++++
 lib/librte_vhost/socket.c        | 501 +--------------------------------------
 lib/librte_vhost/trans_af_unix.c | 451 +++++++++++++++++++++++++++++++++++
 3 files changed, 517 insertions(+), 500 deletions(-)

diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 53811a8b1..8c6d6e524 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -5,6 +5,7 @@
 #ifndef _VHOST_NET_CDEV_H_
 #define _VHOST_NET_CDEV_H_
 #include <stdint.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/queue.h>
@@ -12,12 +13,15 @@
 #include <linux/vhost.h>
 #include <linux/virtio_net.h>
 #include <sys/socket.h>
+#include <sys/un.h> /* TODO remove when trans_af_unix.c refactoring is done */
 #include <linux/if.h>
+#include <pthread.h>
 
 #include <rte_log.h>
 #include <rte_ether.h>
 #include <rte_rwlock.h>
 
+#include "fd_man.h"
 #include "rte_vhost.h"
 
 /* Used to indicate that the device is running on a data core */
@@ -259,6 +263,67 @@ struct virtio_net {
        int                     slave_req_fd;
 } __rte_cache_aligned;
 
+/* The vhost_user, vhost_user_socket, vhost_user_connection, and reconnect
+ * declarations are temporary measures for moving AF_UNIX code into
+ * trans_af_unix.c.  They will be cleaned up as socket.c is untangled from
+ * trans_af_unix.c.
+ */
+TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection);
+
+/*
+ * Every time rte_vhost_driver_register() is invoked, an associated
+ * vhost_user_socket struct will be created.
+ */
+struct vhost_user_socket {
+       struct vhost_user_connection_list conn_list;
+       pthread_mutex_t conn_mutex;
+       char *path;
+       int socket_fd;
+       struct sockaddr_un un;
+       bool is_server;
+       bool reconnect;
+       bool dequeue_zero_copy;
+       bool iommu_support;
+
+       /*
+        * The "supported_features" indicates the feature bits the
+        * vhost driver supports. The "features" indicates the feature
+        * bits after the rte_vhost_driver_features_disable/enable().
+        * It is also the final feature bits used for vhost-user
+        * features negotiation.
+        */
+       uint64_t supported_features;
+       uint64_t features;
+
+       struct vhost_device_ops const *notify_ops;
+};
+
+struct vhost_user_connection {
+       struct vhost_user_socket *vsocket;
+       int connfd;
+       int vid;
+
+       TAILQ_ENTRY(vhost_user_connection) next;
+};
+
+#define MAX_VHOST_SOCKET 1024
+struct vhost_user {
+       struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
+       struct fdset fdset;
+       int vsocket_cnt;
+       pthread_mutex_t mutex;
+};
+
+extern struct vhost_user vhost_user;
+
+int create_unix_socket(struct vhost_user_socket *vsocket);
+int vhost_user_start_server(struct vhost_user_socket *vsocket);
+int vhost_user_start_client(struct vhost_user_socket *vsocket);
+
+extern pthread_t reconn_tid;
+
+int vhost_user_reconnect_init(void);
+bool vhost_user_remove_reconnect(struct vhost_user_socket *vsocket);
 
 #define VHOST_LOG_PAGE 4096
 
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 6e3857e7a..d681f9cae 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -4,17 +4,14 @@
 
 #include <stdint.h>
 #include <stdio.h>
-#include <stdbool.h>
 #include <limits.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
-#include <sys/un.h>
 #include <sys/queue.h>
 #include <errno.h>
-#include <fcntl.h>
 #include <pthread.h>
 
 #include <rte_log.h>
@@ -23,61 +20,7 @@
 #include "vhost.h"
 #include "vhost_user.h"
 
-
-TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection);
-
-/*
- * Every time rte_vhost_driver_register() is invoked, an associated
- * vhost_user_socket struct will be created.
- */
-struct vhost_user_socket {
-       struct vhost_user_connection_list conn_list;
-       pthread_mutex_t conn_mutex;
-       char *path;
-       int socket_fd;
-       struct sockaddr_un un;
-       bool is_server;
-       bool reconnect;
-       bool dequeue_zero_copy;
-       bool iommu_support;
-
-       /*
-        * The "supported_features" indicates the feature bits the
-        * vhost driver supports. The "features" indicates the feature
-        * bits after the rte_vhost_driver_features_disable/enable().
-        * It is also the final feature bits used for vhost-user
-        * features negotiation.
-        */
-       uint64_t supported_features;
-       uint64_t features;
-
-       struct vhost_device_ops const *notify_ops;
-};
-
-struct vhost_user_connection {
-       struct vhost_user_socket *vsocket;
-       int connfd;
-       int vid;
-
-       TAILQ_ENTRY(vhost_user_connection) next;
-};
-
-#define MAX_VHOST_SOCKET 1024
-struct vhost_user {
-       struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
-       struct fdset fdset;
-       int vsocket_cnt;
-       pthread_mutex_t mutex;
-};
-
-#define MAX_VIRTIO_BACKLOG 128
-
-static void vhost_user_server_new_connection(int fd, void *data, int *remove);
-static void vhost_user_read_cb(int fd, void *dat, int *remove);
-static int create_unix_socket(struct vhost_user_socket *vsocket);
-static int vhost_user_start_client(struct vhost_user_socket *vsocket);
-
-static struct vhost_user vhost_user = {
+struct vhost_user vhost_user = {
        .fdset = {
                .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} },
                .fd_mutex = PTHREAD_MUTEX_INITIALIZER,
@@ -87,424 +30,6 @@ static struct vhost_user vhost_user = {
        .mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
-/* return bytes# of read on success or negative val on failure. */
-int
-read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
-{
-       struct iovec iov;
-       struct msghdr msgh;
-       size_t fdsize = fd_num * sizeof(int);
-       char control[CMSG_SPACE(fdsize)];
-       struct cmsghdr *cmsg;
-       int ret;
-
-       memset(&msgh, 0, sizeof(msgh));
-       iov.iov_base = buf;
-       iov.iov_len  = buflen;
-
-       msgh.msg_iov = &iov;
-       msgh.msg_iovlen = 1;
-       msgh.msg_control = control;
-       msgh.msg_controllen = sizeof(control);
-
-       ret = recvmsg(sockfd, &msgh, 0);
-       if (ret <= 0) {
-               RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n");
-               return ret;
-       }
-
-       if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
-               RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n");
-               return -1;
-       }
-
-       for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
-               cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
-               if ((cmsg->cmsg_level == SOL_SOCKET) &&
-                       (cmsg->cmsg_type == SCM_RIGHTS)) {
-                       memcpy(fds, CMSG_DATA(cmsg), fdsize);
-                       break;
-               }
-       }
-
-       return ret;
-}
-
-int
-send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
-{
-
-       struct iovec iov;
-       struct msghdr msgh;
-       size_t fdsize = fd_num * sizeof(int);
-       char control[CMSG_SPACE(fdsize)];
-       struct cmsghdr *cmsg;
-       int ret;
-
-       memset(&msgh, 0, sizeof(msgh));
-       iov.iov_base = buf;
-       iov.iov_len = buflen;
-
-       msgh.msg_iov = &iov;
-       msgh.msg_iovlen = 1;
-
-       if (fds && fd_num > 0) {
-               msgh.msg_control = control;
-               msgh.msg_controllen = sizeof(control);
-               cmsg = CMSG_FIRSTHDR(&msgh);
-               cmsg->cmsg_len = CMSG_LEN(fdsize);
-               cmsg->cmsg_level = SOL_SOCKET;
-               cmsg->cmsg_type = SCM_RIGHTS;
-               memcpy(CMSG_DATA(cmsg), fds, fdsize);
-       } else {
-               msgh.msg_control = NULL;
-               msgh.msg_controllen = 0;
-       }
-
-       do {
-               ret = sendmsg(sockfd, &msgh, 0);
-       } while (ret < 0 && errno == EINTR);
-
-       if (ret < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG,  "sendmsg error\n");
-               return ret;
-       }
-
-       return ret;
-}
-
-static void
-vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
-{
-       int vid;
-       size_t size;
-       struct vhost_user_connection *conn;
-       int ret;
-
-       conn = malloc(sizeof(*conn));
-       if (conn == NULL) {
-               close(fd);
-               return;
-       }
-
-       vid = vhost_new_device();
-       if (vid == -1) {
-               goto err;
-       }
-
-       size = strnlen(vsocket->path, PATH_MAX);
-       vhost_set_ifname(vid, vsocket->path, size);
-
-       if (vsocket->dequeue_zero_copy)
-               vhost_enable_dequeue_zero_copy(vid);
-
-       RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
-
-       if (vsocket->notify_ops->new_connection) {
-               ret = vsocket->notify_ops->new_connection(vid);
-               if (ret < 0) {
-                       RTE_LOG(ERR, VHOST_CONFIG,
-                               "failed to add vhost user connection with fd 
%d\n",
-                               fd);
-                       goto err;
-               }
-       }
-
-       conn->connfd = fd;
-       conn->vsocket = vsocket;
-       conn->vid = vid;
-       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
-                       NULL, conn);
-       if (ret < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "failed to add fd %d into vhost server fdset\n",
-                       fd);
-
-               if (vsocket->notify_ops->destroy_connection)
-                       vsocket->notify_ops->destroy_connection(conn->vid);
-
-               goto err;
-       }
-
-       pthread_mutex_lock(&vsocket->conn_mutex);
-       TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next);
-       pthread_mutex_unlock(&vsocket->conn_mutex);
-       return;
-
-err:
-       free(conn);
-       close(fd);
-}
-
-/* call back when there is new vhost-user connection from client  */
-static void
-vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
-{
-       struct vhost_user_socket *vsocket = dat;
-
-       fd = accept(fd, NULL, NULL);
-       if (fd < 0)
-               return;
-
-       RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd);
-       vhost_user_add_connection(fd, vsocket);
-}
-
-static void
-vhost_user_read_cb(int connfd, void *dat, int *remove)
-{
-       struct vhost_user_connection *conn = dat;
-       struct vhost_user_socket *vsocket = conn->vsocket;
-       int ret;
-
-       ret = vhost_user_msg_handler(conn->vid, connfd);
-       if (ret < 0) {
-               close(connfd);
-               *remove = 1;
-               vhost_destroy_device(conn->vid);
-
-               if (vsocket->notify_ops->destroy_connection)
-                       vsocket->notify_ops->destroy_connection(conn->vid);
-
-               pthread_mutex_lock(&vsocket->conn_mutex);
-               TAILQ_REMOVE(&vsocket->conn_list, conn, next);
-               pthread_mutex_unlock(&vsocket->conn_mutex);
-
-               free(conn);
-
-               if (vsocket->reconnect) {
-                       create_unix_socket(vsocket);
-                       vhost_user_start_client(vsocket);
-               }
-       }
-}
-
-static int
-create_unix_socket(struct vhost_user_socket *vsocket)
-{
-       int fd;
-       struct sockaddr_un *un = &vsocket->un;
-
-       fd = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (fd < 0)
-               return -1;
-       RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
-               vsocket->is_server ? "server" : "client", fd);
-
-       if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "vhost-user: can't set nonblocking mode for socket, fd: 
"
-                       "%d (%s)\n", fd, strerror(errno));
-               close(fd);
-               return -1;
-       }
-
-       memset(un, 0, sizeof(*un));
-       un->sun_family = AF_UNIX;
-       strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
-       un->sun_path[sizeof(un->sun_path) - 1] = '\0';
-
-       vsocket->socket_fd = fd;
-       return 0;
-}
-
-static int
-vhost_user_start_server(struct vhost_user_socket *vsocket)
-{
-       int ret;
-       int fd = vsocket->socket_fd;
-       const char *path = vsocket->path;
-
-       ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));
-       if (ret < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "failed to bind to %s: %s; remove it and try again\n",
-                       path, strerror(errno));
-               goto err;
-       }
-       RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);
-
-       ret = listen(fd, MAX_VIRTIO_BACKLOG);
-       if (ret < 0)
-               goto err;
-
-       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
-                 NULL, vsocket);
-       if (ret < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "failed to add listen fd %d to vhost server fdset\n",
-                       fd);
-               goto err;
-       }
-
-       return 0;
-
-err:
-       close(fd);
-       return -1;
-}
-
-struct vhost_user_reconnect {
-       struct sockaddr_un un;
-       int fd;
-       struct vhost_user_socket *vsocket;
-
-       TAILQ_ENTRY(vhost_user_reconnect) next;
-};
-
-TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect);
-struct vhost_user_reconnect_list {
-       struct vhost_user_reconnect_tailq_list head;
-       pthread_mutex_t mutex;
-};
-
-static struct vhost_user_reconnect_list reconn_list;
-static pthread_t reconn_tid;
-
-static int
-vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
-{
-       int ret, flags;
-
-       ret = connect(fd, un, sz);
-       if (ret < 0 && errno != EISCONN)
-               return -1;
-
-       flags = fcntl(fd, F_GETFL, 0);
-       if (flags < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "can't get flags for connfd %d\n", fd);
-               return -2;
-       }
-       if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                               "can't disable nonblocking on fd %d\n", fd);
-               return -2;
-       }
-       return 0;
-}
-
-static void *
-vhost_user_client_reconnect(void *arg __rte_unused)
-{
-       int ret;
-       struct vhost_user_reconnect *reconn, *next;
-
-       while (1) {
-               pthread_mutex_lock(&reconn_list.mutex);
-
-               /*
-                * An equal implementation of TAILQ_FOREACH_SAFE,
-                * which does not exist on all platforms.
-                */
-               for (reconn = TAILQ_FIRST(&reconn_list.head);
-                    reconn != NULL; reconn = next) {
-                       next = TAILQ_NEXT(reconn, next);
-
-                       ret = vhost_user_connect_nonblock(reconn->fd,
-                                               (struct sockaddr *)&reconn->un,
-                                               sizeof(reconn->un));
-                       if (ret == -2) {
-                               close(reconn->fd);
-                               RTE_LOG(ERR, VHOST_CONFIG,
-                                       "reconnection for fd %d failed\n",
-                                       reconn->fd);
-                               goto remove_fd;
-                       }
-                       if (ret == -1)
-                               continue;
-
-                       RTE_LOG(INFO, VHOST_CONFIG,
-                               "%s: connected\n", reconn->vsocket->path);
-                       vhost_user_add_connection(reconn->fd, reconn->vsocket);
-remove_fd:
-                       TAILQ_REMOVE(&reconn_list.head, reconn, next);
-                       free(reconn);
-               }
-
-               pthread_mutex_unlock(&reconn_list.mutex);
-               sleep(1);
-       }
-
-       return NULL;
-}
-
-static int
-vhost_user_reconnect_init(void)
-{
-       int ret;
-       char thread_name[RTE_MAX_THREAD_NAME_LEN];
-
-       ret = pthread_mutex_init(&reconn_list.mutex, NULL);
-       if (ret < 0) {
-               RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex");
-               return ret;
-       }
-       TAILQ_INIT(&reconn_list.head);
-
-       ret = pthread_create(&reconn_tid, NULL,
-                            vhost_user_client_reconnect, NULL);
-       if (ret != 0) {
-               RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread");
-               if (pthread_mutex_destroy(&reconn_list.mutex)) {
-                       RTE_LOG(ERR, VHOST_CONFIG,
-                               "failed to destroy reconnect mutex");
-               }
-       } else {
-               snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
-                        "vhost-reconn");
-
-               if (rte_thread_setname(reconn_tid, thread_name)) {
-                       RTE_LOG(DEBUG, VHOST_CONFIG,
-                               "failed to set reconnect thread name");
-               }
-       }
-
-       return ret;
-}
-
-static int
-vhost_user_start_client(struct vhost_user_socket *vsocket)
-{
-       int ret;
-       int fd = vsocket->socket_fd;
-       const char *path = vsocket->path;
-       struct vhost_user_reconnect *reconn;
-
-       ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
-                                         sizeof(vsocket->un));
-       if (ret == 0) {
-               vhost_user_add_connection(fd, vsocket);
-               return 0;
-       }
-
-       RTE_LOG(WARNING, VHOST_CONFIG,
-               "failed to connect to %s: %s\n",
-               path, strerror(errno));
-
-       if (ret == -2 || !vsocket->reconnect) {
-               close(fd);
-               return -1;
-       }
-
-       RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
-       reconn = malloc(sizeof(*reconn));
-       if (reconn == NULL) {
-               RTE_LOG(ERR, VHOST_CONFIG,
-                       "failed to allocate memory for reconnect\n");
-               close(fd);
-               return -1;
-       }
-       reconn->un = vsocket->un;
-       reconn->fd = fd;
-       reconn->vsocket = vsocket;
-       pthread_mutex_lock(&reconn_list.mutex);
-       TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
-       pthread_mutex_unlock(&reconn_list.mutex);
-
-       return 0;
-}
-
 static struct vhost_user_socket *
 find_vhost_user_socket(const char *path)
 {
@@ -688,30 +213,6 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
        return ret;
 }
 
-static bool
-vhost_user_remove_reconnect(struct vhost_user_socket *vsocket)
-{
-       int found = false;
-       struct vhost_user_reconnect *reconn, *next;
-
-       pthread_mutex_lock(&reconn_list.mutex);
-
-       for (reconn = TAILQ_FIRST(&reconn_list.head);
-            reconn != NULL; reconn = next) {
-               next = TAILQ_NEXT(reconn, next);
-
-               if (reconn->vsocket == vsocket) {
-                       TAILQ_REMOVE(&reconn_list.head, reconn, next);
-                       close(reconn->fd);
-                       free(reconn);
-                       found = true;
-                       break;
-               }
-       }
-       pthread_mutex_unlock(&reconn_list.mutex);
-       return found;
-}
-
 /**
  * Unregister the specified vhost socket
  */
diff --git a/lib/librte_vhost/trans_af_unix.c b/lib/librte_vhost/trans_af_unix.c
index 9ed04b7eb..636f69916 100644
--- a/lib/librte_vhost/trans_af_unix.c
+++ b/lib/librte_vhost/trans_af_unix.c
@@ -33,7 +33,458 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <fcntl.h>
+
+#include <rte_log.h>
+
 #include "vhost.h"
+#include "vhost_user.h"
+
+#define MAX_VIRTIO_BACKLOG 128
+
+static void vhost_user_read_cb(int connfd, void *dat, int *remove);
+
+/* return bytes# of read on success or negative val on failure. */
+int
+read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
+{
+       struct iovec iov;
+       struct msghdr msgh;
+       size_t fdsize = fd_num * sizeof(int);
+       char control[CMSG_SPACE(fdsize)];
+       struct cmsghdr *cmsg;
+       int ret;
+
+       memset(&msgh, 0, sizeof(msgh));
+       iov.iov_base = buf;
+       iov.iov_len  = buflen;
+
+       msgh.msg_iov = &iov;
+       msgh.msg_iovlen = 1;
+       msgh.msg_control = control;
+       msgh.msg_controllen = sizeof(control);
+
+       ret = recvmsg(sockfd, &msgh, 0);
+       if (ret <= 0) {
+               RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n");
+               return ret;
+       }
+
+       if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+               RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n");
+               return -1;
+       }
+
+       for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+               cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+               if ((cmsg->cmsg_level == SOL_SOCKET) &&
+                       (cmsg->cmsg_type == SCM_RIGHTS)) {
+                       memcpy(fds, CMSG_DATA(cmsg), fdsize);
+                       break;
+               }
+       }
+
+       return ret;
+}
+
+int
+send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
+{
+
+       struct iovec iov;
+       struct msghdr msgh;
+       size_t fdsize = fd_num * sizeof(int);
+       char control[CMSG_SPACE(fdsize)];
+       struct cmsghdr *cmsg;
+       int ret;
+
+       memset(&msgh, 0, sizeof(msgh));
+       iov.iov_base = buf;
+       iov.iov_len = buflen;
+
+       msgh.msg_iov = &iov;
+       msgh.msg_iovlen = 1;
+
+       if (fds && fd_num > 0) {
+               msgh.msg_control = control;
+               msgh.msg_controllen = sizeof(control);
+               cmsg = CMSG_FIRSTHDR(&msgh);
+               cmsg->cmsg_len = CMSG_LEN(fdsize);
+               cmsg->cmsg_level = SOL_SOCKET;
+               cmsg->cmsg_type = SCM_RIGHTS;
+               memcpy(CMSG_DATA(cmsg), fds, fdsize);
+       } else {
+               msgh.msg_control = NULL;
+               msgh.msg_controllen = 0;
+       }
+
+       do {
+               ret = sendmsg(sockfd, &msgh, 0);
+       } while (ret < 0 && errno == EINTR);
+
+       if (ret < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG,  "sendmsg error\n");
+               return ret;
+       }
+
+       return ret;
+}
+
+static void
+vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
+{
+       int vid;
+       size_t size;
+       struct vhost_user_connection *conn;
+       int ret;
+
+       conn = malloc(sizeof(*conn));
+       if (conn == NULL) {
+               close(fd);
+               return;
+       }
+
+       vid = vhost_new_device();
+       if (vid == -1) {
+               goto err;
+       }
+
+       size = strnlen(vsocket->path, PATH_MAX);
+       vhost_set_ifname(vid, vsocket->path, size);
+
+       if (vsocket->dequeue_zero_copy)
+               vhost_enable_dequeue_zero_copy(vid);
+
+       RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
+
+       if (vsocket->notify_ops->new_connection) {
+               ret = vsocket->notify_ops->new_connection(vid);
+               if (ret < 0) {
+                       RTE_LOG(ERR, VHOST_CONFIG,
+                               "failed to add vhost user connection with fd 
%d\n",
+                               fd);
+                       goto err;
+               }
+       }
+
+       conn->connfd = fd;
+       conn->vsocket = vsocket;
+       conn->vid = vid;
+       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
+                       NULL, conn);
+       if (ret < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "failed to add fd %d into vhost server fdset\n",
+                       fd);
+
+               if (vsocket->notify_ops->destroy_connection)
+                       vsocket->notify_ops->destroy_connection(conn->vid);
+
+               goto err;
+       }
+
+       pthread_mutex_lock(&vsocket->conn_mutex);
+       TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next);
+       pthread_mutex_unlock(&vsocket->conn_mutex);
+       return;
+
+err:
+       free(conn);
+       close(fd);
+}
+
+/* call back when there is new vhost-user connection from client  */
+static void
+vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
+{
+       struct vhost_user_socket *vsocket = dat;
+
+       fd = accept(fd, NULL, NULL);
+       if (fd < 0)
+               return;
+
+       RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd);
+       vhost_user_add_connection(fd, vsocket);
+}
+
+static void
+vhost_user_read_cb(int connfd, void *dat, int *remove)
+{
+       struct vhost_user_connection *conn = dat;
+       struct vhost_user_socket *vsocket = conn->vsocket;
+       int ret;
+
+       ret = vhost_user_msg_handler(conn->vid, connfd);
+       if (ret < 0) {
+               close(connfd);
+               *remove = 1;
+               vhost_destroy_device(conn->vid);
+
+               if (vsocket->notify_ops->destroy_connection)
+                       vsocket->notify_ops->destroy_connection(conn->vid);
+
+               pthread_mutex_lock(&vsocket->conn_mutex);
+               TAILQ_REMOVE(&vsocket->conn_list, conn, next);
+               pthread_mutex_unlock(&vsocket->conn_mutex);
+
+               free(conn);
+
+               if (vsocket->reconnect) {
+                       create_unix_socket(vsocket);
+                       vhost_user_start_client(vsocket);
+               }
+       }
+}
+
+int
+create_unix_socket(struct vhost_user_socket *vsocket)
+{
+       int fd;
+       struct sockaddr_un *un = &vsocket->un;
+
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0)
+               return -1;
+       RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
+               vsocket->is_server ? "server" : "client", fd);
+
+       if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "vhost-user: can't set nonblocking mode for socket, fd: 
"
+                       "%d (%s)\n", fd, strerror(errno));
+               close(fd);
+               return -1;
+       }
+
+       memset(un, 0, sizeof(*un));
+       un->sun_family = AF_UNIX;
+       strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
+       un->sun_path[sizeof(un->sun_path) - 1] = '\0';
+
+       vsocket->socket_fd = fd;
+       return 0;
+}
+
+int
+vhost_user_start_server(struct vhost_user_socket *vsocket)
+{
+       int ret;
+       int fd = vsocket->socket_fd;
+       const char *path = vsocket->path;
+
+       ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));
+       if (ret < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "failed to bind to %s: %s; remove it and try again\n",
+                       path, strerror(errno));
+               goto err;
+       }
+       RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);
+
+       ret = listen(fd, MAX_VIRTIO_BACKLOG);
+       if (ret < 0)
+               goto err;
+
+       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
+                 NULL, vsocket);
+       if (ret < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "failed to add listen fd %d to vhost server fdset\n",
+                       fd);
+               goto err;
+       }
+
+       return 0;
+
+err:
+       close(fd);
+       return -1;
+}
+
+struct vhost_user_reconnect {
+       struct sockaddr_un un;
+       int fd;
+       struct vhost_user_socket *vsocket;
+
+       TAILQ_ENTRY(vhost_user_reconnect) next;
+};
+
+TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect);
+struct vhost_user_reconnect_list {
+       struct vhost_user_reconnect_tailq_list head;
+       pthread_mutex_t mutex;
+};
+
+static struct vhost_user_reconnect_list reconn_list;
+pthread_t reconn_tid;
+
+static int
+vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
+{
+       int ret, flags;
+
+       ret = connect(fd, un, sz);
+       if (ret < 0 && errno != EISCONN)
+               return -1;
+
+       flags = fcntl(fd, F_GETFL, 0);
+       if (flags < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "can't get flags for connfd %d\n", fd);
+               return -2;
+       }
+       if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                               "can't disable nonblocking on fd %d\n", fd);
+               return -2;
+       }
+       return 0;
+}
+
+static void *
+vhost_user_client_reconnect(void *arg __rte_unused)
+{
+       int ret;
+       struct vhost_user_reconnect *reconn, *next;
+
+       while (1) {
+               pthread_mutex_lock(&reconn_list.mutex);
+
+               /*
+                * An equal implementation of TAILQ_FOREACH_SAFE,
+                * which does not exist on all platforms.
+                */
+               for (reconn = TAILQ_FIRST(&reconn_list.head);
+                    reconn != NULL; reconn = next) {
+                       next = TAILQ_NEXT(reconn, next);
+
+                       ret = vhost_user_connect_nonblock(reconn->fd,
+                                               (struct sockaddr *)&reconn->un,
+                                               sizeof(reconn->un));
+                       if (ret == -2) {
+                               close(reconn->fd);
+                               RTE_LOG(ERR, VHOST_CONFIG,
+                                       "reconnection for fd %d failed\n",
+                                       reconn->fd);
+                               goto remove_fd;
+                       }
+                       if (ret == -1)
+                               continue;
+
+                       RTE_LOG(INFO, VHOST_CONFIG,
+                               "%s: connected\n", reconn->vsocket->path);
+                       vhost_user_add_connection(reconn->fd, reconn->vsocket);
+remove_fd:
+                       TAILQ_REMOVE(&reconn_list.head, reconn, next);
+                       free(reconn);
+               }
+
+               pthread_mutex_unlock(&reconn_list.mutex);
+               sleep(1);
+       }
+
+       return NULL;
+}
+
+int
+vhost_user_reconnect_init(void)
+{
+       int ret;
+       char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+       ret = pthread_mutex_init(&reconn_list.mutex, NULL);
+       if (ret < 0) {
+               RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex");
+               return ret;
+       }
+       TAILQ_INIT(&reconn_list.head);
+
+       ret = pthread_create(&reconn_tid, NULL,
+                            vhost_user_client_reconnect, NULL);
+       if (ret != 0) {
+               RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread");
+               if (pthread_mutex_destroy(&reconn_list.mutex)) {
+                       RTE_LOG(ERR, VHOST_CONFIG,
+                               "failed to destroy reconnect mutex");
+               }
+       } else {
+               snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+                        "vhost-reconn");
+
+               if (rte_thread_setname(reconn_tid, thread_name)) {
+                       RTE_LOG(DEBUG, VHOST_CONFIG,
+                               "failed to set reconnect thread name");
+               }
+       }
+
+       return ret;
+}
+
+int
+vhost_user_start_client(struct vhost_user_socket *vsocket)
+{
+       int ret;
+       int fd = vsocket->socket_fd;
+       const char *path = vsocket->path;
+       struct vhost_user_reconnect *reconn;
+
+       ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
+                                         sizeof(vsocket->un));
+       if (ret == 0) {
+               vhost_user_add_connection(fd, vsocket);
+               return 0;
+       }
+
+       RTE_LOG(WARNING, VHOST_CONFIG,
+               "failed to connect to %s: %s\n",
+               path, strerror(errno));
+
+       if (ret == -2 || !vsocket->reconnect) {
+               close(fd);
+               return -1;
+       }
+
+       RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
+       reconn = malloc(sizeof(*reconn));
+       if (reconn == NULL) {
+               RTE_LOG(ERR, VHOST_CONFIG,
+                       "failed to allocate memory for reconnect\n");
+               close(fd);
+               return -1;
+       }
+       reconn->un = vsocket->un;
+       reconn->fd = fd;
+       reconn->vsocket = vsocket;
+       pthread_mutex_lock(&reconn_list.mutex);
+       TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
+       pthread_mutex_unlock(&reconn_list.mutex);
+
+       return 0;
+}
+
+bool
+vhost_user_remove_reconnect(struct vhost_user_socket *vsocket)
+{
+       int found = false;
+       struct vhost_user_reconnect *reconn, *next;
+
+       pthread_mutex_lock(&reconn_list.mutex);
+
+       for (reconn = TAILQ_FIRST(&reconn_list.head);
+            reconn != NULL; reconn = next) {
+               next = TAILQ_NEXT(reconn, next);
+
+               if (reconn->vsocket == vsocket) {
+                       TAILQ_REMOVE(&reconn_list.head, reconn, next);
+                       close(reconn->fd);
+                       free(reconn);
+                       found = true;
+                       break;
+               }
+       }
+       pthread_mutex_unlock(&reconn_list.mutex);
+       return found;
+}
 
 static int
 af_unix_vring_call(struct virtio_net *dev __rte_unused,
-- 
2.14.3

Reply via email to