On 30-Nov-17 6:44 PM, Jianfeng Tan wrote:
We need the synchronous way for multi-process communication, that
is to say we need an immediate response after we send a message
to the other side.

We will stop the mp_handler thread, and after sending message,
the send thread will wait there for reponse and process the
respond.

Suggested-by: Anatoly Burakov <anatoly.bura...@intel.com>
Signed-off-by: Jianfeng Tan <jianfeng....@intel.com>
---
  lib/librte_eal/common/eal_common_proc.c | 53 +++++++++++++++++++++++++++++++--
  lib/librte_eal/common/include/rte_eal.h |  5 +++-
  2 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c 
b/lib/librte_eal/common/eal_common_proc.c
index 5d0a095..65ebaf2 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -30,6 +30,8 @@
   *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   */
+#define _GNU_SOURCE
+

shouldn't this be in Makefile flags?

  #include <stdio.h>
  #include <fcntl.h>
  #include <stdlib.h>
@@ -41,6 +43,8 @@
  #include <sys/un.h>
  #include <errno.h>
  #include <pthread.h>
+#include <sys/eventfd.h>
+#include <signal.h>
#include <rte_log.h>
  #include <rte_eal.h>
@@ -134,6 +138,7 @@ rte_eal_mp_action_unregister(const char *name)
struct mp_fds {
        int efd;
+       int evfd; /* eventfd used for pausing mp_handler thread */
union {
                /* fds for primary process */
@@ -331,6 +336,13 @@ mp_handler(void *arg __rte_unused)
                exit(EXIT_FAILURE);
        }
+ ev.data.fd = mp_fds.evfd;
+       if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+               RTE_LOG(ERR, EAL, "epoll_ctl failed: %s\n",
+                       strerror(errno));
+               exit(EXIT_FAILURE);

here and in other places - rte_exit?

+       }
+
        events = calloc(20, sizeof ev);
while (1) {
@@ -348,6 +360,14 @@ mp_handler(void *arg __rte_unused)
                                continue;
                        }
+ if (events[i].data.fd == mp_fds.evfd) {
+                               RTE_LOG(INFO, EAL, "mp_handler thread will 
pause\n");
+                               pause();
+                               RTE_LOG(INFO, EAL, "mp_handler thread stops 
pausing\n");
+
+                               continue;
+                       }
+
                        fd = events[i].data.fd;
if ((events[i].events & EPOLLIN)) {
@@ -377,13 +397,14 @@ mp_handler(void *arg __rte_unused)
        return NULL;
  }
+static pthread_t tid;
+
  int
  rte_eal_mp_channel_init(void)
  {
        int i, fd, ret;
        const char *path;
        struct sockaddr_un un;
-       pthread_t tid;
        char thread_name[RTE_MAX_THREAD_NAME_LEN];
mp_fds.efd = epoll_create1(0);
@@ -462,6 +483,8 @@ rte_eal_mp_channel_init(void)
                return -1;
        }
+ mp_fds.evfd = eventfd(0, 0);
+
        return 0;
  }
@@ -485,7 +508,8 @@ rte_eal_mp_sendmsg(const char *action_name,
                   const void *params,
                   int len_params,
                   int fds[],
-                  int fds_num)
+                  int fds_num,
+                  int need_ack)

I think "need_ack" is a misnomer because what we really want is not "ack" but a response.

More importantly, i think for clarity's sake, this should be a separate function - something like rte_eal_mp_sendreq() or maybe a better name (reqdata? communicate?). Also, i don't think reusing send parameters is a good idea - a user is expecting a response, so a user allocates data for a response separately from requests, and passes it explicitly.

  {
        int i;
        int ret = 0;
@@ -511,6 +535,11 @@ rte_eal_mp_sendmsg(const char *action_name,
RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg); + if (need_ack) {
+               // stop mp_handler thread.

Do we accept C++-style comments?

+               eventfd_write(mp_fds.evfd, (eventfd_t)1);
+       }
+
        msg = malloc(len_msg);
        if (!msg) {
                RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
@@ -547,12 +576,32 @@ rte_eal_mp_sendmsg(const char *action_name,
                        ret = send_msg(mp_fds.secondaries[i], &msgh);
                        if (ret < 0)
                                break;
+
+                       if (need_ack) {
+                               /* We will hang there until the other side
+                                * responses and what if other side is sending
+                                * msg at the same time?
+                                */
+                               process_msg(mp_fds.secondaries[i]);
+                       }
                }
        } else {
                ret = send_msg(mp_fds.primary, &msgh);
+
+               if (ret > 0 && need_ack) {
+                       // We will hang there until the other side responses
+                       ret = process_msg(mp_fds.primary);
+               }
        }
free(msg); + if (need_ack) {
+               // start mp_handler thread.
+               union sigval value;

it's not used, but still, maybe zero-initialize it?

+
+               pthread_sigqueue(tid, 0, value);
+       }
+
        return ret;
  }
diff --git a/lib/librte_eal/common/include/rte_eal.h 
b/lib/librte_eal/common/include/rte_eal.h
index 8776bcf..9875cae 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -274,13 +274,16 @@ void rte_eal_mp_action_unregister(const char *name);
   * @param fds_num
   *   The fds_num argument is number of fds to be sent with sendmsg.
   *
+ * @param need_ack
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
   * @return
   *  - (>=0) on success.
   *  - (<0) on failure.
   */
  int
  rte_eal_mp_sendmsg(const char *action_name, const void *params,
-                  int len_params, int fds[], int fds_num);
+                  int len_params, int fds[], int fds_num, int need_ack);
/**
   * Usage function typedef used by the application usage function.



--
Thanks,
Anatoly

Reply via email to