New flag RTE_VHOST_USER_SOCKETPAIR_BROKER to say that provided socket
is a path to SocketPair Broker socket in a following format:

  '/path/to/socketpair/broker.sock,broker-key=<key>'

This format is chosen to avoid lots of code changes and refactoring
inside the vhost library, mainly because vhost library treats
socket path as a unique device identifier.

'<key>' is a broker key that will be used by a broker to identify
two clients that needs to be paired together, i.e. vhost device
will be connected with a client that provided the same key.

libspbroker needed for a build.

Signed-off-by: Ilya Maximets <i.maxim...@ovn.org>
---
 doc/guides/prog_guide/vhost_lib.rst |  10 ++
 lib/librte_vhost/meson.build        |   7 +
 lib/librte_vhost/rte_vhost.h        |   1 +
 lib/librte_vhost/socket.c           | 245 +++++++++++++++++++++++++---
 4 files changed, 237 insertions(+), 26 deletions(-)

diff --git a/doc/guides/prog_guide/vhost_lib.rst 
b/doc/guides/prog_guide/vhost_lib.rst
index dc29229167..f0f0d3fde7 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -118,6 +118,16 @@ The following is an overview of some key Vhost API 
functions:
 
     It is disabled by default.
 
+  - ``RTE_VHOST_USER_SOCKETPAIR_BROKER``
+
+    Enabling of this flag makes vhost library to treat socket ``path`` as a
+    path to SocketPair Broker.  In this case ``path`` should include
+    ``,broker-key=<key>`` after the actual broker's socket path.  ``<key>``
+    will be used as a broker key, so it will be able to connect 2 processes
+    that provided the same key.
+
+    Incompatible with ``RTE_VHOST_USER_NO_RECONNECT``.
+
 * ``rte_vhost_driver_set_features(path, features)``
 
   This function sets the feature bits the vhost-user driver supports. The
diff --git a/lib/librte_vhost/meson.build b/lib/librte_vhost/meson.build
index 6185deab33..3292edcb52 100644
--- a/lib/librte_vhost/meson.build
+++ b/lib/librte_vhost/meson.build
@@ -15,6 +15,13 @@ elif (toolchain == 'clang' and 
cc.version().version_compare('>=3.7.0'))
 elif (toolchain == 'icc' and cc.version().version_compare('>=16.0.0'))
        cflags += '-DVHOST_ICC_UNROLL_PRAGMA'
 endif
+
+spbroker_dep = dependency('spbroker', required: false)
+if spbroker_dep.found()
+       dpdk_conf.set('RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER', 1)
+       ext_deps += spbroker_dep
+endif
+
 dpdk_conf.set('RTE_LIBRTE_VHOST_POSTCOPY',
              cc.has_header('linux/userfaultfd.h'))
 cflags += '-fno-strict-aliasing'
diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 010f160869..87662c9f7f 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -36,6 +36,7 @@ extern "C" {
 /* support only linear buffers (no chained mbufs) */
 #define RTE_VHOST_USER_LINEARBUF_SUPPORT       (1ULL << 6)
 #define RTE_VHOST_USER_ASYNC_COPY      (1ULL << 7)
+#define RTE_VHOST_USER_SOCKETPAIR_BROKER       (1ULL << 8)
 
 /* Features. */
 #ifndef VIRTIO_NET_F_GUEST_ANNOUNCE
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 0169d36481..f0a1c9044c 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -16,6 +16,10 @@
 #include <fcntl.h>
 #include <pthread.h>
 
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+#include <socketpair-broker/helper.h>
+#endif
+
 #include <rte_log.h>
 
 #include "fd_man.h"
@@ -33,9 +37,11 @@ struct vhost_user_socket {
        struct vhost_user_connection_list conn_list;
        pthread_mutex_t conn_mutex;
        char *path;
+       char *broker_key;
        int socket_fd;
        struct sockaddr_un un;
        bool is_server;
+       bool is_broker;
        bool reconnect;
        bool iommu_support;
        bool use_builtin_virtio_net;
@@ -81,7 +87,8 @@ struct vhost_user {
 static void vhost_user_server_new_connection(int fd, void *data, int *remove);
 static void vhost_user_read_cb(int fd, void *dat, int *remove);
 static int create_unix_socket(struct vhost_user_socket *vsocket);
-static int vhost_user_start_client(struct vhost_user_socket *vsocket);
+static int recreate_unix_socket(struct vhost_user_socket *vsocket);
+static int vhost_user_start(struct vhost_user_socket *vsocket);
 
 static struct vhost_user vhost_user = {
        .fdset = {
@@ -283,6 +290,81 @@ vhost_user_add_connection(int fd, struct vhost_user_socket 
*vsocket)
        close(fd);
 }
 
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+static int
+vhost_user_broker_msg_handler(int fd, struct vhost_user_socket *vsocket)
+{
+       int peer_fd;
+       char *err;
+
+       peer_fd = sp_broker_receive_set_pair(fd, &err);
+       if (peer_fd < 0) {
+               VHOST_LOG_CONFIG(ERR,
+                       "failed to receive SP_BROKER_SET_PAIR on fd %d: %s\n",
+                       fd, err);
+               free(err);
+               return -1;
+       }
+
+       VHOST_LOG_CONFIG(INFO, "new vhost user connection is %d\n", peer_fd);
+       vhost_user_add_connection(peer_fd, vsocket);
+       return 0;
+}
+
+static void
+vhost_user_broker_msg_cb(int connfd, void *dat, int *remove)
+{
+       struct vhost_user_socket *vsocket = dat;
+       int ret;
+
+       ret = vhost_user_broker_msg_handler(connfd, vsocket);
+
+       /* Don't need a broker connection anymore. */
+       *remove = 1;
+
+       if (ret < 0) {
+               recreate_unix_socket(vsocket);
+               vhost_user_start(vsocket);
+       }
+}
+
+static int
+vhost_user_start_broker_connection(
+       int fd __rte_unused,
+       struct vhost_user_socket *vsocket __rte_unused)
+{
+       char *err;
+       int ret;
+
+       ret = sp_broker_send_get_pair(fd, vsocket->broker_key,
+                                     vsocket->is_server, &err);
+       if (ret) {
+               VHOST_LOG_CONFIG(ERR,
+                       "failed to send SP_BROKER_GET_PAIR request on fd %d: 
%s\n",
+                       fd, err);
+               free(err);
+               return -1;
+       }
+
+       ret = fdset_add(&vhost_user.fdset, fd, vhost_user_broker_msg_cb,
+                       NULL, vsocket);
+       if (ret < 0) {
+               VHOST_LOG_CONFIG(ERR,
+                       "failed to add broker fd %d to vhost fdset\n", fd);
+               return -1;
+       }
+       return 0;
+}
+#else
+static int
+vhost_user_start_broker_connection(
+       int fd __rte_unused,
+       struct vhost_user_socket *vsocket __rte_unused)
+{
+       return -1;
+}
+#endif
+
 /* call back when there is new vhost-user connection from client  */
 static void
 vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
@@ -321,7 +403,7 @@ vhost_user_read_cb(int connfd, void *dat, int *remove)
 
                if (vsocket->reconnect) {
                        create_unix_socket(vsocket);
-                       vhost_user_start_client(vsocket);
+                       vhost_user_start(vsocket);
                }
 
                pthread_mutex_lock(&vsocket->conn_mutex);
@@ -337,14 +419,17 @@ create_unix_socket(struct vhost_user_socket *vsocket)
 {
        int fd;
        struct sockaddr_un *un = &vsocket->un;
+       char *broker_key = strstr(vsocket->path, ",broker-key=");
 
        fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0)
                return -1;
-       VHOST_LOG_CONFIG(INFO, "vhost-user %s: socket created, fd: %d\n",
-               vsocket->is_server ? "server" : "client", fd);
+       VHOST_LOG_CONFIG(INFO, "vhost-user %s: %ssocket created, fd: %d\n",
+               vsocket->is_server ? "server" : "client",
+               vsocket->is_broker ? "broker " : "", fd);
 
-       if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
+       if ((!vsocket->is_server || vsocket->is_broker)
+           && fcntl(fd, F_SETFL, O_NONBLOCK)) {
                VHOST_LOG_CONFIG(ERR,
                        "vhost-user: can't set nonblocking mode for socket, fd: 
"
                        "%d (%s)\n", fd, strerror(errno));
@@ -352,12 +437,21 @@ create_unix_socket(struct vhost_user_socket *vsocket)
                return -1;
        }
 
+       /* Temporarily limiting the path by the actual path. */
+       if (vsocket->is_broker && broker_key)
+               broker_key[0] = '\0';
+
        memset(un, 0, sizeof(*un));
        un->sun_family = AF_UNIX;
        strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
        un->sun_path[sizeof(un->sun_path) - 1] = '\0';
 
        vsocket->socket_fd = fd;
+
+       /* Restoring original path. */
+       if (vsocket->is_broker && broker_key)
+               broker_key[0] = ',';
+
        return 0;
 }
 
@@ -425,7 +519,8 @@ static struct vhost_user_reconnect_list reconn_list;
 static pthread_t reconn_tid;
 
 static int
-vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
+vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz,
+                           bool disable_nonblock)
 {
        int ret, flags;
 
@@ -433,6 +528,9 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, 
size_t sz)
        if (ret < 0 && errno != EISCONN)
                return -1;
 
+       if (!disable_nonblock)
+               return 0;
+
        flags = fcntl(fd, F_GETFL, 0);
        if (flags < 0) {
                VHOST_LOG_CONFIG(ERR,
@@ -447,8 +545,22 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, 
size_t sz)
        return 0;
 }
 
+static int
+recreate_unix_socket(struct vhost_user_socket *vsocket)
+{
+       close(vsocket->socket_fd);
+       if (create_unix_socket(vsocket) < 0) {
+               VHOST_LOG_CONFIG(ERR,
+                       "Failed to re-create socket for %s\n",
+                       vsocket->un.sun_path);
+               vsocket->socket_fd = -1;
+               return -2;
+       }
+       return 0;
+}
+
 static void *
-vhost_user_client_reconnect(void *arg __rte_unused)
+vhost_user_reconnect(void *arg __rte_unused)
 {
        int ret;
        struct vhost_user_reconnect *reconn, *next;
@@ -466,7 +578,8 @@ vhost_user_client_reconnect(void *arg __rte_unused)
 
                        ret = vhost_user_connect_nonblock(reconn->fd,
                                                (struct sockaddr *)&reconn->un,
-                                               sizeof(reconn->un));
+                                               sizeof(reconn->un),
+                                               !reconn->vsocket->is_broker);
                        if (ret == -2) {
                                close(reconn->fd);
                                VHOST_LOG_CONFIG(ERR,
@@ -478,8 +591,26 @@ vhost_user_client_reconnect(void *arg __rte_unused)
                                continue;
 
                        VHOST_LOG_CONFIG(INFO,
-                               "%s: connected\n", reconn->vsocket->path);
-                       vhost_user_add_connection(reconn->fd, reconn->vsocket);
+                               "%s: connected\n",
+                               reconn->vsocket->un.sun_path);
+
+                       if (reconn->vsocket->is_broker) {
+                               struct vhost_user_socket *vsocket;
+
+                               vsocket = reconn->vsocket;
+                               if (vhost_user_start_broker_connection(
+                                       reconn->fd, vsocket)) {
+                                       if (recreate_unix_socket(vsocket)) {
+                                               goto remove_fd;
+                                       } else {
+                                               reconn->fd = vsocket->socket_fd;
+                                               continue;
+                                       }
+                               }
+                       } else {
+                               vhost_user_add_connection(reconn->fd,
+                                                         reconn->vsocket);
+                       }
 remove_fd:
                        TAILQ_REMOVE(&reconn_list.head, reconn, next);
                        free(reconn);
@@ -505,7 +636,7 @@ vhost_user_reconnect_init(void)
        TAILQ_INIT(&reconn_list.head);
 
        ret = rte_ctrl_thread_create(&reconn_tid, "vhost_reconn", NULL,
-                            vhost_user_client_reconnect, NULL);
+                            vhost_user_reconnect, NULL);
        if (ret != 0) {
                VHOST_LOG_CONFIG(ERR, "failed to create reconnect thread");
                if (pthread_mutex_destroy(&reconn_list.mutex)) {
@@ -518,18 +649,25 @@ vhost_user_reconnect_init(void)
 }
 
 static int
-vhost_user_start_client(struct vhost_user_socket *vsocket)
+vhost_user_start(struct vhost_user_socket *vsocket)
 {
        int ret;
        int fd = vsocket->socket_fd;
-       const char *path = vsocket->path;
+       const char *path = vsocket->un.sun_path;
        struct vhost_user_reconnect *reconn;
 
        ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
-                                         sizeof(vsocket->un));
+                                         sizeof(vsocket->un),
+                                         !vsocket->is_broker);
        if (ret == 0) {
-               vhost_user_add_connection(fd, vsocket);
-               return 0;
+               if (!vsocket->is_broker) {
+                       vhost_user_add_connection(fd, vsocket);
+                       return 0;
+               } else if (vhost_user_start_broker_connection(fd, vsocket)) {
+                       ret = recreate_unix_socket(vsocket);
+               } else {
+                       return 0;
+               }
        }
 
        VHOST_LOG_CONFIG(WARNING,
@@ -822,6 +960,11 @@ rte_vhost_driver_get_queue_num(const char *path, uint32_t 
*queue_num)
 static void
 vhost_user_socket_mem_free(struct vhost_user_socket *vsocket)
 {
+       if (vsocket && vsocket->broker_key) {
+               free(vsocket->broker_key);
+               vsocket->broker_key = NULL;
+       }
+
        if (vsocket && vsocket->path) {
                free(vsocket->path);
                vsocket->path = NULL;
@@ -946,15 +1089,50 @@ rte_vhost_driver_register(const char *path, uint64_t 
flags)
 #endif
        }
 
-       if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
+       if ((flags & RTE_VHOST_USER_SOCKETPAIR_BROKER) != 0) {
+#ifndef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+               VHOST_LOG_CONFIG(ERR,
+                       "SocketPair Broker requested but not compiled\n");
+               ret = -1;
+               goto out_mutex;
+#endif
+               char *broker_key = strstr(vsocket->path, ",broker-key=");
+
+               if (!broker_key || !broker_key[12]) {
+                       VHOST_LOG_CONFIG(ERR,
+                               "Connection to SocketPair Broker requested but"
+                               " key is not provided: %s\n", vsocket->path);
+                       ret = -1;
+                       goto out_mutex;
+               }
+               vsocket->is_broker = true;
+               vsocket->broker_key = strdup(broker_key + 12);
+               if (vsocket->broker_key == NULL) {
+                       VHOST_LOG_CONFIG(ERR,
+                               "error: failed to copy broker key\n");
+                       ret = -1;
+                       goto out_mutex;
+               }
+       }
+
+       if ((flags & RTE_VHOST_USER_CLIENT) == 0)
+               vsocket->is_server = true;
+
+       if (!vsocket->is_server || vsocket->is_broker) {
                vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
+               if (vsocket->is_broker && !vsocket->reconnect) {
+                       VHOST_LOG_CONFIG(ERR,
+                               "SocketPair Broker with NO_RECONNECT "
+                               "is not supported\n");
+                       ret = -1;
+                       goto out_mutex;
+               }
                if (vsocket->reconnect && reconn_tid == 0) {
                        if (vhost_user_reconnect_init() != 0)
                                goto out_mutex;
                }
-       } else {
-               vsocket->is_server = true;
        }
+
        ret = create_unix_socket(vsocket);
        if (ret < 0) {
                goto out_mutex;
@@ -1052,7 +1230,23 @@ rte_vhost_driver_unregister(const char *path)
                        }
                        pthread_mutex_unlock(&vsocket->conn_mutex);
 
-                       if (vsocket->is_server) {
+                       if (vsocket->reconnect) {
+                               if (vhost_user_remove_reconnect(vsocket)) {
+                                       /*
+                                        * reconn->fd is a socket_fd for
+                                        * client and broker connections and
+                                        * it's closed now.
+                                        */
+                                       vsocket->socket_fd = -1;
+                               }
+                       }
+
+                       /*
+                        * socket_fd is still valid for server connection
+                        * or broker connection that is currently connected
+                        * to the broker.
+                        */
+                       if (vsocket->socket_fd != -1) {
                                /*
                                 * If r/wcb is executing, release vhost_user's
                                 * mutex lock, and try again since the r/wcb
@@ -1063,13 +1257,12 @@ rte_vhost_driver_unregister(const char *path)
                                        pthread_mutex_unlock(&vhost_user.mutex);
                                        goto again;
                                }
-
                                close(vsocket->socket_fd);
-                               unlink(path);
-                       } else if (vsocket->reconnect) {
-                               vhost_user_remove_reconnect(vsocket);
                        }
 
+                       if (vsocket->is_server && !vsocket->is_broker)
+                               unlink(path);
+
                        pthread_mutex_destroy(&vsocket->conn_mutex);
                        vhost_user_socket_mem_free(vsocket);
 
@@ -1152,8 +1345,8 @@ rte_vhost_driver_start(const char *path)
                }
        }
 
-       if (vsocket->is_server)
+       if (vsocket->is_server && !vsocket->is_broker)
                return vhost_user_start_server(vsocket);
        else
-               return vhost_user_start_client(vsocket);
+               return vhost_user_start(vsocket);
 }
-- 
2.26.2

Reply via email to