Previouly, there is only one way for primary/secondary to exchange
messages, that is, primary process writes info into some predefind
file, and secondary process reads info out. That cannot address
the requirements:
  a. Secondary wants to send info to primary.
  b. Sending info at any time, instead of just initialization time.
  c. Share FD with the other side.

This patch proposes to create a communication channel (as an unix
socket connection) for above requirements.

Three new APIs are added:

  1. rte_eal_primary_secondary_add_action is used to register an action,
if the calling component wants to response the messages from the
corresponding component in its primary process or secondary processes.
  2. rte_eal_primary_secondary_del_action is used to unregister the
action if the calling component does not want to response the messages.
  3. rte_eal_primary_secondary_sendmsg is used to send a message.

Signed-off-by: Jianfeng Tan <jianfeng....@intel.com>
---
 lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
 lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
 lib/librte_eal/common/eal_filesystem.h          |  18 +
 lib/librte_eal/common/eal_private.h             |  10 +
 lib/librte_eal/common/include/rte_eal.h         |  74 ++++
 lib/librte_eal/linuxapp/eal/eal.c               |   6 +
 lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
 7 files changed, 578 insertions(+)

diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map 
b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index aac6fd7..f4ff29f 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -237,3 +237,11 @@ EXPERIMENTAL {
        rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+       global:
+
+       rte_eal_primary_secondary_add_action;
+       rte_eal_primary_secondary_del_action;
+       rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
diff --git a/lib/librte_eal/common/eal_common_proc.c 
b/lib/librte_eal/common/eal_common_proc.c
index 60526ca..fa316bf 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -33,8 +33,20 @@
 #include <stdio.h>
 #include <fcntl.h>
 #include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <rte_log.h>
 #include <rte_eal.h>
+#include <rte_lcore.h>
 
+#include "eal_private.h"
 #include "eal_filesystem.h"
 #include "eal_internal_cfg.h"
 
@@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
 
        return !!ret;
 }
+
+struct action_entry {
+       TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
+
+#define MAX_ACTION_NAME_LEN    64
+       char action_name[MAX_ACTION_NAME_LEN];
+       rte_eal_primary_secondary_t *action;
+};
+
+/** Double linked list of actions. */
+TAILQ_HEAD(action_entry_list, action_entry);
+
+static struct action_entry_list action_entry_list =
+       TAILQ_HEAD_INITIALIZER(action_entry_list);
+
+static struct action_entry *
+find_action_entry_by_name(const char *name)
+{
+       int len = strlen(name);
+       struct action_entry *entry;
+
+       TAILQ_FOREACH(entry, &action_entry_list, next) {
+               if (strncmp(entry->action_name, name, len) == 0)
+                               break;
+       }
+
+       return entry;
+}
+
+int
+rte_eal_primary_secondary_add_action(const char *action_name,
+                                    rte_eal_primary_secondary_t action)
+{
+       struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+       if (entry == NULL)
+               return -ENOMEM;
+
+       strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+       entry->action = action;
+       TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+       return 0;
+}
+
+void
+rte_eal_primary_secondary_del_action(const char *name)
+{
+       struct action_entry *entry = find_action_entry_by_name(name);
+
+       TAILQ_REMOVE(&action_entry_list, entry, next);
+       free(entry);
+}
+
+#define MAX_SECONDARY_PROCS    8
+
+static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
+static int fd_listen;   /* unix listen socket by primary */
+static int fd_to_pri;   /* only used by secondary process */
+static int fds_to_sec[MAX_SECONDARY_PROCS];
+
+struct msg_hdr {
+       char action_name[MAX_ACTION_NAME_LEN];
+       int fds_num;
+       char params[0];
+} __rte_packed;
+
+static int
+add_sec_proc(int fd)
+{
+       int i;
+
+       for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+               if (fds_to_sec[i] == -1)
+                       break;
+
+       if (i >= MAX_SECONDARY_PROCS)
+               return -1;
+
+       fds_to_sec[i] = fd;
+
+       return i;
+}
+
+static void
+del_sec_proc(int fd)
+{
+       int i;
+
+       for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+               if (fds_to_sec[i] == fd) {
+                       fds_to_sec[i] = -1;
+                       break;
+               }
+       }
+}
+
+static int
+read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
+{
+       struct iovec iov;
+       struct msghdr msgh;
+       size_t fdsize = fds_num * sizeof(int);
+       char control[CMSG_SPACE(fdsize)];
+       struct cmsghdr *cmsg;
+       int ret;
+
+       memset(&msgh, 0, sizeof(msgh));
+       iov.iov_base = buf;
+       iov.iov_len  = buflen;
+
+       msgh.msg_iov = &iov;
+       msgh.msg_iovlen = 1;
+       msgh.msg_control = control;
+       msgh.msg_controllen = sizeof(control);
+
+       ret = recvmsg(sockfd, &msgh, 0);
+       if (ret <= 0) {
+               RTE_LOG(ERR, EAL, "recvmsg failed\n");
+               return ret;
+       }
+
+       if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+               RTE_LOG(ERR, EAL, "truncted msg\n");
+               return -1;
+       }
+
+       for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+               cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+               if ((cmsg->cmsg_level == SOL_SOCKET) &&
+                       (cmsg->cmsg_type == SCM_RIGHTS)) {
+                       memcpy(fds, CMSG_DATA(cmsg), fdsize);
+                       break;
+               }
+       }
+
+       return ret;
+}
+
+static int
+process_msg(int fd)
+{
+       int len;
+       int params_len;
+       char buf[1024];
+       int fds[8]; /* accept at most 8 FDs per message */
+       struct msg_hdr *hdr;
+       struct action_entry *entry;
+
+       len = read_msg(fd, buf, 1024, fds, 8);
+       if (len < 0) {
+               RTE_LOG(ERR, EAL, "failed to read message: %s\n",
+                       strerror(errno));
+               return -1;
+       }
+
+       hdr = (struct msg_hdr *) buf;
+
+       entry = find_action_entry_by_name(hdr->action_name);
+       if (entry == NULL) {
+               RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
+                       hdr->action_name);
+               return -1;
+       }
+
+       params_len = len - sizeof(struct msg_hdr);
+       return entry->action(hdr->params, params_len, fds, hdr->fds_num);
+}
+
+static void *
+thread_primary(__attribute__((unused)) void *arg)
+{
+       int fd;
+       int i, n;
+       struct epoll_event event;
+       struct epoll_event *events;
+
+       event.events = EPOLLIN | EPOLLRDHUP;
+       event.data.fd = fd_listen;
+       if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
+               RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+                       strerror(errno));
+               exit(EXIT_FAILURE);
+       }
+
+       events = calloc(20, sizeof event);
+
+       while (1) {
+               n = epoll_wait(efd_pri_sec, events, 20, -1);
+               for (i = 0; i < n; i++) {
+                       if (events[i].data.fd == fd_listen) {
+                               if (events[i].events != EPOLLIN) {
+                                       RTE_LOG(ERR, EAL, "what happens?\n");
+                                       exit(EXIT_FAILURE);
+                               }
+
+                               fd = accept(fd_listen, NULL, NULL);
+                               if (fd < 0) {
+                                       RTE_LOG(ERR, EAL, "primary failed to 
accept: %s\n",
+                                               strerror(errno));
+                                       continue;
+                               }
+
+                               event.data.fd = fd;
+                               if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, 
&event) < 0) {
+                                       RTE_LOG(ERR, EAL, "failed to add 
secondary: %s\n",
+                                               strerror(errno));
+                                       continue;
+                               }
+                               if (add_sec_proc(fd) < 0)
+                                       RTE_LOG(ERR, EAL, "too many secondary 
processes\n");
+
+                               continue;
+                       }
+
+                       fd = events[i].data.fd;
+
+                       if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+                               RTE_LOG(ERR, EAL,
+                                       "secondary process exit: %d\n", fd);
+                               epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
+                               del_sec_proc(fd);
+                               continue;
+                       }
+
+                       if ((events[i].events & EPOLLIN)) {
+                               RTE_LOG(INFO, EAL,
+                                       "recv msg from secondary process\n");
+
+                               process_msg(fd);
+                       }
+               }
+       }
+
+       return NULL;
+}
+
+static void *
+thread_secondary(__attribute__((unused)) void *arg)
+{
+       int fd;
+       int i, n;
+       struct epoll_event event;
+       struct epoll_event *events;
+
+       event.events = EPOLLIN | EPOLLRDHUP;
+       event.data.fd = fd_to_pri;
+       if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
+               RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
+               exit(EXIT_FAILURE);
+       }
+
+       events = calloc(20, sizeof event);
+
+       while (1) {
+               n = epoll_wait(efd_pri_sec, events, 20, -1);
+               for (i = 0; i < n; i++) {
+
+                       fd = events[i].data.fd;
+
+                       if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+                               RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+                               /* Do we need exit secondary when primary 
exits? */
+                               exit(EXIT_FAILURE);
+                       }
+
+                       if ((events[i].events & EPOLLIN)) {
+                               RTE_LOG(INFO, EAL,
+                                       "recv msg from primary process\n");
+                               process_msg(fd);
+                       }
+               }
+       }
+
+       return NULL;
+}
+
+int
+rte_eal_primary_secondary_channel_init(void)
+{
+       int i, fd, ret;
+       const char *path;
+       struct sockaddr_un un;
+       pthread_t tid;
+       void*(*fn)(void *);
+       char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+       efd_pri_sec = epoll_create1(0);
+       if (efd_pri_sec < 0) {
+               RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
+               return -1;
+       }
+
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0) {
+               RTE_LOG(ERR, EAL, "Failed to create unix socket");
+               return -1;
+       }
+
+       memset(&un, 0, sizeof(un));
+       un.sun_family = AF_UNIX;
+       path = eal_primary_secondary_unix_path();
+       strncpy(un.sun_path, path, sizeof(un.sun_path));
+       un.sun_path[sizeof(un.sun_path) - 1] = '\0';
+
+       if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+
+               for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+                       fds_to_sec[i] = -1;
+
+               if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+                       RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
+                       close(fd);
+                       return -1;
+               }
+
+               /* The file still exists since last run */
+               unlink(path);
+
+               ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
+                               path, strerror(errno));
+                       close(fd);
+                       return -1;
+               }
+               RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
+
+               ret = listen(fd, 1024);
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to listen: %s", 
strerror(errno));
+                       close(fd);
+                       return -1;
+               }
+
+               fn = thread_primary;
+               fd_listen = fd;
+       } else {
+               ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to connect primary\n");
+                       return -1;
+               }
+               fn = thread_secondary;
+               fd_to_pri = fd;
+       }
+
+       ret = pthread_create(&tid, NULL, fn, NULL);
+       if (ret < 0) {
+               RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
+                       strerror(errno));
+               close(fd);
+               close(efd_pri_sec);
+               return -1;
+       }
+
+       snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+                "ps_channel");
+       ret = rte_thread_setname(tid, thread_name);
+       if (ret < 0) {
+               RTE_LOG(ERR, EAL, "failed to set thead name\n");
+               close(fd);
+               close(efd_pri_sec);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int
+send_msg(int fd, struct msghdr *p_msgh)
+{
+       int ret;
+
+       do {
+               ret = sendmsg(fd, p_msgh, 0);
+       } while (ret < 0 && errno == EINTR);
+
+       return ret;
+}
+
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+                                 const void *params,
+                                 int len_params,
+                                 int fds[],
+                                 int fds_num)
+{
+       int i;
+       int ret = 0;
+       struct msghdr msgh;
+       struct iovec iov;
+       size_t fd_size = fds_num * sizeof(int);
+       char control[CMSG_SPACE(fd_size)];
+       struct cmsghdr *cmsg;
+       struct msg_hdr *msg;
+       int len_msg;
+
+       len_msg = sizeof(struct msg_hdr) + len_params;
+       msg = malloc(len_msg);
+       if (!msg) {
+               RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
+               return -ENOMEM;
+       }
+       memset(msg, 0, len_msg);
+       strcpy(msg->action_name, action_name);
+       msg->fds_num = fds_num;
+       memcpy(msg->params, params, len_params);
+
+       memset(&msgh, 0, sizeof(msgh));
+       memset(control, 0, sizeof(control));
+
+       iov.iov_base = (uint8_t *)msg;
+       iov.iov_len = len_msg;
+
+       msgh.msg_iov = &iov;
+       msgh.msg_iovlen = 1;
+       msgh.msg_control = control;
+       msgh.msg_controllen = sizeof(control);
+
+       cmsg = CMSG_FIRSTHDR(&msgh);
+       cmsg->cmsg_len = CMSG_LEN(fd_size);
+       cmsg->cmsg_level = SOL_SOCKET;
+       cmsg->cmsg_type = SCM_RIGHTS;
+       memcpy(CMSG_DATA(cmsg), fds, fd_size);
+
+       if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+               for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+                       if (fds_to_sec[i] == -1)
+                               continue;
+
+                       ret = send_msg(fds_to_sec[i], &msgh);
+                       if (ret < 0)
+                               break;
+               }
+       } else {
+               ret = send_msg(fd_to_pri, &msgh);
+       }
+
+       free(msg);
+
+       return ret;
+}
diff --git a/lib/librte_eal/common/eal_filesystem.h 
b/lib/librte_eal/common/eal_filesystem.h
index 8acbd99..78bb4fb 100644
--- a/lib/librte_eal/common/eal_filesystem.h
+++ b/lib/librte_eal/common/eal_filesystem.h
@@ -67,6 +67,24 @@ eal_runtime_config_path(void)
        return buffer;
 }
 
+/** Path of primary/secondary communication unix socket file. */
+#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
+static inline const char *
+eal_primary_secondary_unix_path(void)
+{
+       static char buffer[PATH_MAX]; /* static so auto-zeroed */
+       const char *directory = default_config_dir;
+       const char *home_dir = getenv("HOME");
+
+       if (getuid() != 0 && home_dir != NULL)
+               directory = home_dir;
+       snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
+                directory, internal_config.hugefile_prefix);
+
+       return buffer;
+
+}
+
 /** Path of hugepage info file. */
 #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
 
diff --git a/lib/librte_eal/common/eal_private.h 
b/lib/librte_eal/common/eal_private.h
index 597d82e..719b160 100644
--- a/lib/librte_eal/common/eal_private.h
+++ b/lib/librte_eal/common/eal_private.h
@@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
  */
 struct rte_bus *rte_bus_find_by_device_name(const char *str);
 
+/**
+ * Create the unix channel for primary/secondary communication.
+ *
+ * @return
+ *   0 on success;
+ *   (<0) on failure.
+ */
+
+int rte_eal_primary_secondary_channel_init(void);
+
 #endif /* _EAL_PRIVATE_H_ */
diff --git a/lib/librte_eal/common/include/rte_eal.h 
b/lib/librte_eal/common/include/rte_eal.h
index 0e7363d..6cfc9d6 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
 int rte_eal_primary_proc_alive(const char *config_file_path);
 
 /**
+ * Action function typedef used by other components.
+ *
+ * As we create unix socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming messages.
+ */
+typedef int (rte_eal_primary_secondary_t)(const char *params,
+                                         int len,
+                                         int fds[],
+                                         int fds_num);
+/**
+ * Register an action function for primary/secondary communication.
+ *
+ * Call this function to register an action, if the calling component wants
+ * to response the messages from the corresponding component in its primary
+ * process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ * @param action
+ *   The action argument is the function pointer to the action function.
+ *
+ * @return
+ *  - 0 on success.
+ *  - (<0) on failure.
+ */
+int rte_eal_primary_secondary_add_action(const char *action_name,
+                                        rte_eal_primary_secondary_t action);
+/**
+ * Unregister an action function for primary/secondary communication.
+ *
+ * Call this function to unregister an action  if the calling component does
+ * not want to response the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ */
+void rte_eal_primary_secondary_del_action(const char *name);
+
+/**
+ * Send a message to the primary process or the secondary processes.
+ *
+ * This function will send a message which will be responsed by the action
+ * identified by action_name of the process on the other side.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_params
+ *   The len_params argument is the length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_num
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (>=0) on success.
+ *  - (<0) on failure.
+ */
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+                                 const void *params,
+                                 int len_params,
+                                 int fds[],
+                                 int fds_num);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_applcation_usage_hook()
diff --git a/lib/librte_eal/linuxapp/eal/eal.c 
b/lib/librte_eal/linuxapp/eal/eal.c
index 48f12f4..237c0b1 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
 
        eal_check_mem_on_local_socket();
 
+       if (rte_eal_primary_secondary_channel_init() < 0) {
+               rte_eal_init_alert("Cannot create unix channel.");
+               rte_errno = EFAULT;
+               return -1;
+       }
+
        if (eal_plugins_init() < 0)
                rte_eal_init_alert("Cannot init plugins\n");
 
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map 
b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 3a8f154..c618aec 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -242,3 +242,11 @@ EXPERIMENTAL {
        rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+       global:
+
+       rte_eal_primary_secondary_add_action;
+       rte_eal_primary_secondary_del_action;
+       rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
-- 
2.7.4

Reply via email to