Hi Jiayu,

On 9/20/2017 11:00 AM, Jiayu Hu wrote:
Hi Jianfeng,

Few questions are inline.

Thanks,
Jiayu

On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
Previouly, there is only one way for primary/secondary to exchange
messages, that is, primary process writes info into some predefind
file, and secondary process reads info out. That cannot address
the requirements:
   a. Secondary wants to send info to primary.
   b. Sending info at any time, instead of just initialization time.
   c. Share FD with the other side.

This patch proposes to create a communication channel (as an unix
socket connection) for above requirements.

Three new APIs are added:

   1. rte_eal_primary_secondary_add_action is used to register an action,
if the calling component wants to response the messages from the
corresponding component in its primary process or secondary processes.
   2. rte_eal_primary_secondary_del_action is used to unregister the
action if the calling component does not want to response the messages.
   3. rte_eal_primary_secondary_sendmsg is used to send a message.

Signed-off-by: Jianfeng Tan <jianfeng....@intel.com>
---
  lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
  lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
  lib/librte_eal/common/eal_filesystem.h          |  18 +
  lib/librte_eal/common/eal_private.h             |  10 +
  lib/librte_eal/common/include/rte_eal.h         |  74 ++++
  lib/librte_eal/linuxapp/eal/eal.c               |   6 +
  lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
  7 files changed, 578 insertions(+)

diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map 
b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index aac6fd7..f4ff29f 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -237,3 +237,11 @@ EXPERIMENTAL {
        rte_service_unregister;
} DPDK_17.08;
+
+EXPERIMENTAL {
+       global:
+
+       rte_eal_primary_secondary_add_action;
+       rte_eal_primary_secondary_del_action;
+       rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
diff --git a/lib/librte_eal/common/eal_common_proc.c 
b/lib/librte_eal/common/eal_common_proc.c
index 60526ca..fa316bf 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -33,8 +33,20 @@
  #include <stdio.h>
  #include <fcntl.h>
  #include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <rte_log.h>
  #include <rte_eal.h>
+#include <rte_lcore.h>
+#include "eal_private.h"
  #include "eal_filesystem.h"
  #include "eal_internal_cfg.h"
@@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path) return !!ret;
  }
+
+struct action_entry {
+       TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
+
+#define MAX_ACTION_NAME_LEN    64
+       char action_name[MAX_ACTION_NAME_LEN];
+       rte_eal_primary_secondary_t *action;
+};
+
+/** Double linked list of actions. */
+TAILQ_HEAD(action_entry_list, action_entry);
+
+static struct action_entry_list action_entry_list =
+       TAILQ_HEAD_INITIALIZER(action_entry_list);
+
+static struct action_entry *
+find_action_entry_by_name(const char *name)
+{
+       int len = strlen(name);
+       struct action_entry *entry;
+
+       TAILQ_FOREACH(entry, &action_entry_list, next) {
+               if (strncmp(entry->action_name, name, len) == 0)
+                               break;
+       }
+
+       return entry;
+}
+
+int
+rte_eal_primary_secondary_add_action(const char *action_name,
+                                    rte_eal_primary_secondary_t action)
+{
+       struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+       if (entry == NULL)
+               return -ENOMEM;
+
+       strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+       entry->action = action;
+       TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+       return 0;
+}
+
+void
+rte_eal_primary_secondary_del_action(const char *name)
+{
+       struct action_entry *entry = find_action_entry_by_name(name);
+
+       TAILQ_REMOVE(&action_entry_list, entry, next);
+       free(entry);
+}
+
+#define MAX_SECONDARY_PROCS    8
+
+static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
+static int fd_listen;   /* unix listen socket by primary */
+static int fd_to_pri;   /* only used by secondary process */
+static int fds_to_sec[MAX_SECONDARY_PROCS];
+
+struct msg_hdr {
+       char action_name[MAX_ACTION_NAME_LEN];
+       int fds_num;
+       char params[0];
+} __rte_packed;
+
+static int
+add_sec_proc(int fd)
+{
+       int i;
+
+       for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+               if (fds_to_sec[i] == -1)
+                       break;
+
+       if (i >= MAX_SECONDARY_PROCS)
+               return -1;
+
+       fds_to_sec[i] = fd;
+
+       return i;
+}
+
+static void
+del_sec_proc(int fd)
+{
+       int i;
+
+       for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+               if (fds_to_sec[i] == fd) {
+                       fds_to_sec[i] = -1;
+                       break;
+               }
+       }
+}
+
+static int
+read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
+{
+       struct iovec iov;
+       struct msghdr msgh;
+       size_t fdsize = fds_num * sizeof(int);
+       char control[CMSG_SPACE(fdsize)];
+       struct cmsghdr *cmsg;
+       int ret;
+
+       memset(&msgh, 0, sizeof(msgh));
+       iov.iov_base = buf;
+       iov.iov_len  = buflen;
+
+       msgh.msg_iov = &iov;
+       msgh.msg_iovlen = 1;
+       msgh.msg_control = control;
+       msgh.msg_controllen = sizeof(control);
+
+       ret = recvmsg(sockfd, &msgh, 0);
+       if (ret <= 0) {
+               RTE_LOG(ERR, EAL, "recvmsg failed\n");
+               return ret;
+       }
+
+       if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+               RTE_LOG(ERR, EAL, "truncted msg\n");
+               return -1;
+       }
+
+       for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+               cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+               if ((cmsg->cmsg_level == SOL_SOCKET) &&
+                       (cmsg->cmsg_type == SCM_RIGHTS)) {
+                       memcpy(fds, CMSG_DATA(cmsg), fdsize);
+                       break;
+               }
+       }
+
+       return ret;
+}
+
+static int
+process_msg(int fd)
+{
+       int len;
+       int params_len;
+       char buf[1024];
The max length of message to receive is 1024 here, but the
senders don't know the limit. It's better to define a macro
for the max message length?

OK, let's make it a macro, and check the length when sending messages.


+       int fds[8]; /* accept at most 8 FDs per message */
+       struct msg_hdr *hdr;
+       struct action_entry *entry;
+
+       len = read_msg(fd, buf, 1024, fds, 8);
+       if (len < 0) {
+               RTE_LOG(ERR, EAL, "failed to read message: %s\n",
+                       strerror(errno));
+               return -1;
+       }
Why don't check if len is equal to 0?

Nice catch!


+
+       hdr = (struct msg_hdr *) buf;
+
+       entry = find_action_entry_by_name(hdr->action_name);
+       if (entry == NULL) {
+               RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
+                       hdr->action_name);
+               return -1;
+       }
+
+       params_len = len - sizeof(struct msg_hdr);
+       return entry->action(hdr->params, params_len, fds, hdr->fds_num);
+}
+
+static void *
+thread_primary(__attribute__((unused)) void *arg)
+{
+       int fd;
+       int i, n;
+       struct epoll_event event;
+       struct epoll_event *events;
+
+       event.events = EPOLLIN | EPOLLRDHUP;
+       event.data.fd = fd_listen;
+       if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
+               RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+                       strerror(errno));
+               exit(EXIT_FAILURE);
+       }
+
+       events = calloc(20, sizeof event);
A simple question: Why the max events number is 20?

Another hard-coded value, this value only decides how many events can be process for each iteration, other events will be kept in kernel for another iteration.


+
+       while (1) {
+               n = epoll_wait(efd_pri_sec, events, 20, -1);
+               for (i = 0; i < n; i++) {
+                       if (events[i].data.fd == fd_listen) {
+                               if (events[i].events != EPOLLIN) {
+                                       RTE_LOG(ERR, EAL, "what happens?\n");
+                                       exit(EXIT_FAILURE);
+                               }
+
+                               fd = accept(fd_listen, NULL, NULL);
+                               if (fd < 0) {
+                                       RTE_LOG(ERR, EAL, "primary failed to accept: 
%s\n",
This line is beyond 80 characters.

Will fix it.
[...]
+
+                               continue;
+                       }
+
+                       fd = events[i].data.fd;
+
+                       if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+                               RTE_LOG(ERR, EAL,
+                                       "secondary process exit: %d\n", fd);
+                               epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
+                               del_sec_proc(fd);
Need close(fd) here?

Nice catch.


+                               continue;
+                       }
+
+                       if ((events[i].events & EPOLLIN)) {
+                               RTE_LOG(INFO, EAL,
+                                       "recv msg from secondary process\n");
+
+                               process_msg(fd);
+                       }
+               }
+       }
+
+       return NULL;
+}
+
+static void *
+thread_secondary(__attribute__((unused)) void *arg)
+{
+       int fd;
+       int i, n;
+       struct epoll_event event;
+       struct epoll_event *events;
+
+       event.events = EPOLLIN | EPOLLRDHUP;
+       event.data.fd = fd_to_pri;
+       if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
+               RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
+               exit(EXIT_FAILURE);
+       }
+
+       events = calloc(20, sizeof event);
+
+       while (1) {
+               n = epoll_wait(efd_pri_sec, events, 20, -1);
+               for (i = 0; i < n; i++) {
+
+                       fd = events[i].data.fd;
+
+                       if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+                               RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+                               /* Do we need exit secondary when primary 
exits? */
Need close(fd) here?

We will exit here, no need to close().

Thanks,
Jianfeng

Reply via email to