Hi Jianfeng,

> 
> Previouly, there are three channels for multi-process
> (i.e., primary/secondary) communication.
>   1. Config-file based channel, in which, the primary process writes
>      info into a pre-defined config file, and the secondary process
>      reads the info out.
>   2. vfio submodule has its own channel based on unix socket for the
>      secondary process to get container fd and group fd from the
>      primary process.
>   3. pdump submodule also has its own channel based on unix socket for
>      packet dump.
> 
> It'd be good to have a generic communication channel for multi-process
> communication to accomodate the requirements including:
>   a. Secondary wants to send info to primary, for example, secondary
>      would like to send request (about some specific vdev to primary).
>   b. Sending info at any time, instead of just initialization time.
>   c. Share FDs with the other side, for vdev like vhost, related FDs
>      (memory region, kick) should be shared.
>   d. A send message request needs the other side to response immediately.
> 
> This patch proposes to create a communication channel, based on datagram
> unix socket, for above requirements. Each process will block on a unix
> socket waiting for messages from the peers.
> 
> Three new APIs are added:
> 
>   1. rte_eal_mp_action_register() is used to register an action,
>      indexed by a string, when a component at receiver side would like
>      to response the messages from the peer processe.
>   2. rte_eal_mp_action_unregister() is used to unregister the action
>      if the calling component does not want to response the messages.
>   3. rte_eal_mp_sendmsg() is used to send a message, and returns
>      immediately. If there are 1:n primary:secondary processes, the
>      primary process will send n messages.
> 
> Suggested-by: Konstantin Ananyev <konstantin.anan...@intel.com>
> Signed-off-by: Jianfeng Tan <jianfeng....@intel.com>
> ---
>  lib/librte_eal/common/eal_common_proc.c | 388 
> ++++++++++++++++++++++++++++++++
>  lib/librte_eal/common/eal_filesystem.h  |  17 ++
>  lib/librte_eal/common/eal_private.h     |  10 +
>  lib/librte_eal/common/include/rte_eal.h |  69 ++++++
>  lib/librte_eal/linuxapp/eal/eal.c       |   8 +
>  lib/librte_eal/rte_eal_version.map      |   9 +
>  6 files changed, 501 insertions(+)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c 
> b/lib/librte_eal/common/eal_common_proc.c
> index 40fa982..d700e9e 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -5,11 +5,55 @@
>  #include <stdio.h>
>  #include <fcntl.h>
>  #include <stdlib.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <sys/un.h>
> +#include <errno.h>
> +#include <pthread.h>
> +
> +#include <rte_log.h>
>  #include <rte_eal.h>
> +#include <rte_errno.h>
> +#include <rte_lcore.h>
> +#include <rte_common.h>
> 
> +#include "eal_private.h"
>  #include "eal_filesystem.h"
>  #include "eal_internal_cfg.h"
> 
> +#define MAX_SECONDARY_PROCS  8
> +#define MAX_ACTION_NAME_LEN  64
> +#define MAX_UNIX_PATH_LEN    104

Why do you need this?
Why not just PATH_MAX?

> +#define MAX_MSG_LENGTH               1024
> +#define SCM_MAX_FD           253 /* The max amount of fds */
> +
> +static int mp_fd = -1;
> +static char *mp_sec_sockets[MAX_SECONDARY_PROCS];

Who will init it and why it could be only 8?

> +static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
> +
> +struct action_entry {
> +     TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
> +
> +#define MAX_ACTION_NAME_LEN  64
> +     char action_name[MAX_ACTION_NAME_LEN];
> +     rte_eal_mp_t action;
> +};
> +
> +/** Double linked list of actions. */
> +TAILQ_HEAD(action_entry_list, action_entry);
> +
> +static struct action_entry_list action_entry_list =
> +     TAILQ_HEAD_INITIALIZER(action_entry_list);
> +
> +struct mp_msghdr {
> +     char action_name[MAX_ACTION_NAME_LEN];
> +     int fds_num;
> +     int len_params;
> +     char params[0];
> +} __rte_packed;
> +
>  int
>  rte_eal_primary_proc_alive(const char *config_file_path)
>  {
> @@ -31,3 +75,347 @@ rte_eal_primary_proc_alive(const char *config_file_path)
> 
>       return !!ret;
>  }
> +
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> +     int len = strlen(name);
> +     struct action_entry *entry;
> +
> +     TAILQ_FOREACH(entry, &action_entry_list, next) {
> +             if (strncmp(entry->action_name, name, len) == 0)

I think it has be just strcmp() here.


> +                     break;
> +     }
> +
> +     return entry;
> +}
> +
> +int
> +rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action)
> +{
> +     struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +     if (entry == NULL) {
> +             rte_errno = -ENOMEM;
> +             return -1;
> +     }
> +
> +     if (strlen(action_name) > MAX_ACTION_NAME_LEN) {

No space for '\0' left.
either >= MAX_ACTION_NAME_LEN, or make entry.name[MAX_ACTION_NAME_LEN + 1];
Even better just 
 - allocate new action_entry.
if (snprintf(action->name, "%s", action_name) >= sizeof(action->name)) {
    free(action);
    return -E2BIG;  
} 


> +             rte_errno = -E2BIG;
> +             return -1;
> +     }
> +
> +     pthread_mutex_lock(&mp_mutex_action);
> +     if (find_action_entry_by_name(action_name) != NULL) {
> +             free(entry);

Forgot to do mutex_unlock().

> +             rte_errno = -EEXIST;
> +             return -1;
> +     }
> +     strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +     entry->action = action;
> +     TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> +     pthread_mutex_unlock(&mp_mutex_action);
> +     return 0;
> +}
> +
> +void
> +rte_eal_mp_action_unregister(const char *name)
> +{
> +     struct action_entry *entry;
> +
> +     pthread_mutex_lock(&mp_mutex_action);
> +     entry = find_action_entry_by_name(name);
> +     TAILQ_REMOVE(&action_entry_list, entry, next);
> +     free(entry);

Better to do free() after releasing the mutex.

> +     pthread_mutex_unlock(&mp_mutex_action);
> +}
> +
> +static int
> +read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
> +{
> +     int ret;
> +     struct iovec iov;
> +     struct msghdr msgh;
> +     size_t fdsize = fds_num * sizeof(int);
> +     char control[CMSG_SPACE(fdsize)];
> +     struct cmsghdr *cmsg;
> +
> +     memset(&msgh, 0, sizeof(msgh));
> +     iov.iov_base = buf;
> +     iov.iov_len  = buflen;
> +
> +     msgh.msg_iov = &iov;
> +     msgh.msg_iovlen = 1;
> +     msgh.msg_control = control;
> +     msgh.msg_controllen = sizeof(control);
> +
> +     ret = recvmsg(fd, &msgh, 0);
> +     if (ret < 0) {
> +             RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
> +             return -1;
> +     }
> +
> +     if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
> +             RTE_LOG(ERR, EAL, "truncted msg\n");
> +             return -1;
> +     }
> +
> +     /* read auxiliary FDs if any */
> +     for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
> +             cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
> +             if ((cmsg->cmsg_level == SOL_SOCKET) &&
> +                     (cmsg->cmsg_type == SCM_RIGHTS)) {
> +                     memcpy(fds, CMSG_DATA(cmsg), fdsize);
> +                     break;
> +             }
> +     }
> +
> +     return ret;
> +}
> +
> +static int
> +process_msg(struct mp_msghdr *hdr, int len, int fds[])
> +{
> +     int ret;
> +     int params_len;
> +     struct action_entry *entry;
> +
> +     RTE_LOG(DEBUG, EAL, "msg: %s\n", hdr->action_name);
> +
> +     pthread_mutex_lock(&mp_mutex_action);
> +     entry = find_action_entry_by_name(hdr->action_name);
> +     if (entry == NULL) {
> +             RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> +                     hdr->action_name);
> +             pthread_mutex_unlock(&mp_mutex_action);
> +             return -1;

If no action is specified for that message - who will free it?
If action() exisits is it a responsibility of action() to free msg?

> +     }
> +
> +     params_len = len - sizeof(struct mp_msghdr);
> +     ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);

Do you really need to do action() with lock held?

> +     pthread_mutex_unlock(&mp_mutex_action);
> +     return ret;
> +
> +}
> +
> +static void *

Why just not 'void' here?

> +mp_handle(void *arg __rte_unused)
> +{
> +     int len;
> +     int fds[SCM_MAX_FD];
> +     char buf[MAX_MSG_LENGTH];
> +
> +     while (1) {
> +             len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
> +             if (len > 0)
> +                     process_msg((struct mp_msghdr *)buf, len, fds);
> +     }
> +
> +     return NULL;
> +}
> +
> +static inline const char *
> +get_unix_path(int is_server)
> +{
> +     static char unix_path[MAX_UNIX_PATH_LEN];

PATH_MAX?

Why just not make that function to accept char path[PATH_MAX] as a parameter?

> +     const char *prefix = eal_mp_unix_path();
> +     const char *suffix = (is_server) ? "" : "_c";
> +
> +     if (rte_eal_process_type() == RTE_PROC_PRIMARY)
> +             snprintf(unix_path, MAX_UNIX_PATH_LEN, "%s%s", prefix, suffix);
> +     else
> +             snprintf(unix_path, MAX_UNIX_PATH_LEN, "%s%s_%d",
> +                      prefix, suffix, getpid());
> +     return unix_path;
> +}
> +
> +static int
> +open_unix_fd(int is_server)
> +{
> +     int fd;
> +     struct sockaddr_un un;
> +
> +     fd = socket(AF_UNIX, SOCK_DGRAM, 0);
> +     if (fd < 0) {
> +             RTE_LOG(ERR, EAL, "failed to create unix socket\n");
> +             return -1;
> +     }
> +
> +     memset(&un, 0, sizeof(un));
> +     un.sun_family = AF_UNIX;
> +     snprintf(un.sun_path, MAX_UNIX_PATH_LEN, "%s",
> +              get_unix_path(is_server));
> +     unlink(un.sun_path); /* May still exist since last run */
> +     if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
> +             RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
> +                     un.sun_path, strerror(errno));
> +             close(fd);
> +             return -1;
> +     }
> +
> +     RTE_LOG(INFO, EAL, "bind to %s\n", un.sun_path);
> +     return fd;
> +}
> +
> +int
> +rte_eal_mp_channel_init(void)
> +{
> +     pthread_t tid;
> +     char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +     mp_fd = open_unix_fd(1);
> +     if (mp_fd < 0)
> +             return -1;
> +
> +     if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
> +             RTE_LOG(ERR, EAL, "failed to create mp handle thead: %s\n",
> +                     strerror(errno));
> +             goto error;
> +     }
> +
> +     snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
> +     if (rte_thread_setname(tid, thread_name) < 0) {
> +             RTE_LOG(ERR, EAL, "failed to set thead name\n");

Forgot to terminate thread?

> +             goto error;

As a nit - can we reorder code a bit to avoid 'goto's?

> +     }
> +
> +     return 0;
> +error:
> +     close(mp_fd);
> +     mp_fd = -1;
> +     return -1;
> +}
> +
> +static inline struct mp_msghdr *
> +format_msg(const char *act_name, const void *p, int len_params, int fds_num)
> +{
> +     int len_msg;
> +     struct mp_msghdr *msg;
> +
> +     len_msg = sizeof(struct mp_msghdr) + len_params;
> +     if (len_msg > MAX_MSG_LENGTH) {
> +             RTE_LOG(ERR, EAL, "Message is too long\n");
> +             rte_errno = -EINVAL;
> +             return NULL;
> +     }
> +
> +     msg = malloc(len_msg);
> +     if (!msg) {
> +             RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
> +             rte_errno = -ENOMEM;
> +             return NULL;
> +     }
> +     memset(msg, 0, len_msg);
> +     strcpy(msg->action_name, act_name);
> +     msg->fds_num = fds_num;
> +     msg->len_params = len_params;
> +     memcpy(msg->params, p, len_params);
> +     return msg;
> +}
> +
> +static int
> +send_msg(int fd, const char *dst_path, struct mp_msghdr *msg, int fds[])
> +{
> +     int ret;
> +     struct msghdr msgh;
> +     struct iovec iov;
> +     size_t fd_size = msg->fds_num * sizeof(int);
> +     char control[CMSG_SPACE(fd_size)];
> +     struct cmsghdr *cmsg;
> +     struct sockaddr_un dst;
> +
> +     memset(&dst, 0, sizeof(dst));
> +     dst.sun_family = AF_UNIX;
> +     snprintf(dst.sun_path, MAX_UNIX_PATH_LEN, "%s", dst_path);
> +
> +     memset(&msgh, 0, sizeof(msgh));
> +     memset(control, 0, sizeof(control));
> +
> +     iov.iov_base = (uint8_t *)msg;
> +     iov.iov_len = sizeof(struct mp_msghdr) + msg->len_params;
> +
> +     msgh.msg_name = &dst;
> +     msgh.msg_namelen = sizeof(dst);
> +     msgh.msg_iov = &iov;
> +     msgh.msg_iovlen = 1;
> +     msgh.msg_control = control;
> +     msgh.msg_controllen = sizeof(control);
> +
> +     cmsg = CMSG_FIRSTHDR(&msgh);
> +     cmsg->cmsg_len = CMSG_LEN(fd_size);
> +     cmsg->cmsg_level = SOL_SOCKET;
> +     cmsg->cmsg_type = SCM_RIGHTS;
> +     memcpy(CMSG_DATA(cmsg), fds, fd_size);
> +
> +     do {
> +             ret = sendmsg(fd, &msgh, 0);
> +     } while (ret < 0 && errno == EINTR);
> +
> +     if (ret < 0) {
> +             RTE_LOG(ERR, EAL, "failed to send msg: %s\n", strerror(errno));
> +
> +             if (rte_eal_process_type() == RTE_PROC_PRIMARY)
> +                     RTE_LOG(ERR, EAL, "secondary process (%s) exited\n",
> +                             dst_path);
> +             else if (!rte_eal_primary_proc_alive(NULL))
> +                     RTE_LOG(ERR, EAL, "primary process exited\n");

So secondary to secondary are not allowed?

> +
> +             return 0;
> +     }
> +
> +     return 1;
> +}
> +
> +static int
> +mp_send(const char *action_name,
> +     const void *params,
> +     int len_params,
> +     int fds[],
> +     int fds_num)
> +{
> +     int i;
> +     int n = 0;
> +     int sockfd;
> +     struct mp_msghdr *msg;
> +
> +     if (fds_num > SCM_MAX_FD) {
> +             RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
> +             rte_errno = -E2BIG;
> +             return 0;
> +     }
> +
> +     msg = format_msg(action_name, params, len_params, fds_num);
> +     if (msg == NULL)
> +             return 0;
> +
> +     if ((sockfd = open_unix_fd(0)) < 0) {
> +             free(msg);
> +             return 0;
> +     }
> +
> +     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +             /* broadcast to all secondaries */
> +             for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> +                     if (mp_sec_sockets[i] == NULL)
> +                             continue;
> +
> +                     n += send_msg(sockfd, mp_sec_sockets[i], msg, fds);
> +             }
> +     } else
> +             n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
> +
> +     free(msg);
> +     close(sockfd);
> +     return n;
> +}
> +
> +int
> +rte_eal_mp_sendmsg(const char *action_name,
> +                const void *params,
> +                int len_params,
> +                int fds[],
> +                int fds_num)
> +{
> +     RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
> +     return mp_send(action_name, params, len_params, fds, fds_num);
> +}
> diff --git a/lib/librte_eal/common/eal_filesystem.h 
> b/lib/librte_eal/common/eal_filesystem.h
> index e8959eb..e95399b 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -38,6 +38,23 @@ eal_runtime_config_path(void)
>       return buffer;
>  }
> 
> +/** Path of primary/secondary communication unix socket file. */
> +#define MP_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_mp_unix_path(void)
> +{
> +     static char buffer[PATH_MAX]; /* static so auto-zeroed */
> +     const char *directory = default_config_dir;
> +     const char *home_dir = getenv("HOME");
> +
> +     if (getuid() != 0 && home_dir != NULL)
> +             directory = home_dir;
> +     snprintf(buffer, sizeof(buffer) - 1, MP_UNIX_PATH_FMT,
> +              directory, internal_config.hugefile_prefix);
> +
> +     return buffer;
> +}
> +
>  /** Path of hugepage info file. */
>  #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
> 
> diff --git a/lib/librte_eal/common/eal_private.h 
> b/lib/librte_eal/common/eal_private.h
> index c46dd8f..e36e3b5 100644
> --- a/lib/librte_eal/common/eal_private.h
> +++ b/lib/librte_eal/common/eal_private.h
> @@ -195,4 +195,14 @@ int rte_eal_hugepage_attach(void);
>   */
>  struct rte_bus *rte_bus_find_by_device_name(const char *str);
> 
> +/**
> + * Create the unix channel for primary/secondary communication.
> + *
> + * @return
> + *   0 on success;
> + *   (<0) on failure.
> + */
> +
> +int rte_eal_mp_channel_init(void);
> +
>  #endif /* _EAL_PRIVATE_H_ */
> diff --git a/lib/librte_eal/common/include/rte_eal.h 
> b/lib/librte_eal/common/include/rte_eal.h
> index 02fa109..9884c0b 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -186,6 +186,75 @@ int rte_eal_init(int argc, char **argv);
>  int rte_eal_primary_proc_alive(const char *config_file_path);
> 
>  /**
> + * Action function typedef used by other components.
> + *
> + * As we create unix socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming messages.
> + */
> +typedef int (*rte_eal_mp_t)(const void *params, int len,
> +                         int fds[], int fds_num);
> +
> +/**
> + * Register an action function for primary/secondary communication.
> + *
> + * Call this function to register an action, if the calling component wants
> + * to response the messages from the corresponding component in its primary
> + * process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the 
> action.
> + *
> + * @param action
> + *   The action argument is the function pointer to the action function.
> + *
> + * @return
> + *  - 0 on success.
> + *  - (<0) on failure.
> + */
> +int rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action);
> +
> +/**
> + * Unregister an action function for primary/secondary communication.
> + *
> + * Call this function to unregister an action  if the calling component does
> + * not want to response the messages from the corresponding component in its
> + * primary process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the 
> action.
> + *
> + */
> +void rte_eal_mp_action_unregister(const char *name);
> +
> +/**
> + * Send a message to the peer process.
> + *
> + * This function will send a message which will be responsed by the action
> + * identified by action_name of the process on the other side.
> + *
> + * @param action_name
> + *   The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + *   The params argument contains the customized message.
> + *
> + * @param len_params
> + *   The len_params argument is the length of the customized message.
> + *
> + * @param fds
> + *   The fds argument is an array of fds sent with sendmsg.
> + *
> + * @param fds_num
> + *   The fds_num argument is number of fds to be sent with sendmsg.
> + *
> + * @return
> + *  - Returns the number of messages being sent successfully.
> + */
> +int
> +rte_eal_mp_sendmsg(const char *action_name, const void *params,
> +                int len_params, int fds[], int fds_num);
> +
> +/**
>   * Usage function typedef used by the application usage function.
>   *
>   * Use this function typedef to define and call 
> rte_set_application_usage_hook()
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c 
> b/lib/librte_eal/linuxapp/eal/eal.c
> index 229eec9..f231724 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -896,6 +896,14 @@ rte_eal_init(int argc, char **argv)
> 
>       eal_check_mem_on_local_socket();
> 
> +     if (rte_eal_mp_channel_init() < 0) {
> +             rte_eal_init_alert("failed to init mp channel\n");
> +             if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +                     rte_errno = EFAULT;
> +                     return -1;
> +             }
> +     }
> +
>       eal_thread_init_master(rte_config.master_lcore);
> 
>       ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
> diff --git a/lib/librte_eal/rte_eal_version.map 
> b/lib/librte_eal/rte_eal_version.map
> index f4f46c1..5dacde5 100644
> --- a/lib/librte_eal/rte_eal_version.map
> +++ b/lib/librte_eal/rte_eal_version.map
> @@ -235,4 +235,13 @@ EXPERIMENTAL {
>       rte_service_set_stats_enable;
>       rte_service_start_with_defaults;
> 
> +} DPDK_17.08;
> +
> +DPDK_18.02 {
> +     global:
> +
> +     rte_eal_mp_action_register;
> +     rte_eal_mp_action_unregister;
> +     rte_eal_mp_sendmsg;
> +
>  } DPDK_17.11;
> --
> 2.7.4

Reply via email to