Changeset: 11f022e6b68c for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=11f022e6b68c
Modified Files:
        common/stream/stream.c
        configure.ag
        monetdb5/modules/mal/mal_mapi.c
        tools/merovingian/daemon/client.c
        tools/merovingian/daemon/controlrunner.c
        tools/merovingian/daemon/discoveryrunner.c
        tools/merovingian/daemon/merovingian.c
        tools/merovingian/daemon/multiplex-funnel.c
Branch: Apr2019
Log Message:

Use poll() instead of select() when available.
This works around a limitation in the implementation of the FD_SET set
of macros on Linux which limits the file descriptors that can be used
to FD_SETSIZE (1024).  Windows doesn't have this limitation, nor does
poll().


diffs (truncated from 679 to 300 lines):

diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -72,6 +72,9 @@
 # include <netinet/tcp.h>
 # include <netdb.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef NATIVE_WIN32
 #include <io.h>
@@ -2492,9 +2495,21 @@ socket_read(stream *restrict s, void *re
 #endif
        for (;;) {
                if (s->timeout) {
+                       int ret;
+#ifdef HAVE_POLL
+                       struct pollfd pfd;
+
+                       pfd = (struct pollfd) {.fd = s->stream_data.s,
+                                              .events = POLLIN};
+
+                       ret = poll(&pfd, 1, (int) s->timeout);
+                       if (ret == -1 || (pfd.revents & POLLERR)) {
+                               s->errnr = MNSTR_READ_ERROR;
+                               return -1;
+                       }
+#else
                        struct timeval tv;
                        fd_set fds;
-                       int ret;
 
                        errno = 0;
 #ifdef _MSC_VER
@@ -2515,6 +2530,7 @@ socket_read(stream *restrict s, void *re
                                s->errnr = MNSTR_READ_ERROR;
                                return -1;
                        }
+#endif
                        if (ret == 0) {
                                if (s->timeout_func == NULL || 
s->timeout_func()) {
                                        s->errnr = MNSTR_TIMEOUT;
@@ -2523,7 +2539,11 @@ socket_read(stream *restrict s, void *re
                                continue;
                        }
                        assert(ret == 1);
+#ifdef HAVE_POLL
+                       assert(pfd.revents & (POLLIN|POLLHUP));
+#else
                        assert(FD_ISSET(s->stream_data.s, &fds));
+#endif
                }
 #ifdef _MSC_VER
                nr = recv(s->stream_data.s, buf, (int) size, 0);
@@ -2620,9 +2640,20 @@ static int
 socket_isalive(stream *s)
 {
        SOCKET fd = s->stream_data.s;
-       char buffer[32];
+#ifdef HAVE_POLL
+       struct pollfd pfd;
+       int ret;
+       pfd = (struct pollfd){.fd = fd};
+       if ((ret = poll(&pfd, 1, 0)) == 0)
+               return 1;
+       if (ret < 0 || pfd.revents & (POLLERR | POLLHUP))
+               return 0;
+       assert(0);              /* unexpected revents value */
+       return 0;
+#else
        fd_set fds;
        struct timeval t;
+       char buffer[32];
 
        t.tv_sec = 0;
        t.tv_usec = 0;
@@ -2636,6 +2667,7 @@ socket_isalive(stream *s)
 #endif
                &fds, NULL, NULL, &t) <= 0 ||
                recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0;
+#endif
 }
 
 static stream *
diff --git a/configure.ag b/configure.ag
--- a/configure.ag
+++ b/configure.ag
@@ -2396,6 +2396,7 @@ AC_CHECK_HEADERS([ \
        mach-o/dyld.h \
        netdb.h \
        netinet/in.h \
+       poll.h \
        procfs.h \
        pwd.h \
        strings.h \
@@ -2605,6 +2606,7 @@ AC_CHECK_FUNCS([\
        nl_langinfo \
        _NSGetExecutablePath \
        pipe2 \
+       poll \
        popen \
        posix_fadvise \
        posix_fallocate \
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -58,6 +58,9 @@
 # include <netdb.h>
 # include <netinet/in.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 #ifdef HAVE_SYS_UIO_H
 # include <sys/uio.h>
 #endif
@@ -281,8 +284,13 @@ SERVERlistenThread(SOCKET *Sock)
 {
        char *msg = 0;
        int retval;
+#ifdef HAVE_POLL
+       struct pollfd pfd[2];
+       nfds_t npfd;
+#else
        struct timeval tv;
        fd_set fds;
+#endif
        SOCKET sock = INVALID_SOCKET;
        SOCKET usock = INVALID_SOCKET;
        SOCKET msgsock = INVALID_SOCKET;
@@ -297,6 +305,17 @@ SERVERlistenThread(SOCKET *Sock)
        (void) ATOMIC_INC(&nlistener);
 
        do {
+#ifdef HAVE_POLL
+               npfd = 0;
+               if (sock != INVALID_SOCKET)
+                       pfd[npfd++] = (struct pollfd) {.fd = sock, .events = 
POLLIN};
+#ifdef HAVE_SYS_UN_H
+               if (usock != INVALID_SOCKET)
+                       pfd[npfd++] = (struct pollfd) {.fd = usock, .events = 
POLLIN};
+#endif
+               /* Wait up to 0.025 seconds (0.001 if testing) */
+               retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 25);
+#else
                FD_ZERO(&fds);
                if (sock != INVALID_SOCKET)
                        FD_SET(sock, &fds);
@@ -304,7 +323,7 @@ SERVERlistenThread(SOCKET *Sock)
                if (usock != INVALID_SOCKET)
                        FD_SET(usock, &fds);
 #endif
-               /* Wait up to 0.025 seconds (0.01 if testing) */
+               /* Wait up to 0.025 seconds (0.001 if testing) */
                tv.tv_sec = 0;
                tv.tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 25000;
 
@@ -315,6 +334,7 @@ SERVERlistenThread(SOCKET *Sock)
                        msgsock = usock;
 #endif
                retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv);
+#endif
                if (ATOMIC_GET(&serverexiting) || GDKexiting())
                        break;
                if (retval == 0) {
@@ -334,7 +354,13 @@ SERVERlistenThread(SOCKET *Sock)
                        }
                        continue;
                }
-               if (sock != INVALID_SOCKET && FD_ISSET(sock, &fds)) {
+               if (sock != INVALID_SOCKET &&
+#ifdef HAVE_POLL
+                       (npfd > 0 && pfd[0].fd == sock && pfd[0].revents & 
POLLIN)
+#else
+                       FD_ISSET(sock, &fds)
+#endif
+                       ) {
                        if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t 
*)0, SOCK_CLOEXEC)) == INVALID_SOCKET) {
                                if (
 #ifdef _MSC_VER
@@ -352,7 +378,14 @@ SERVERlistenThread(SOCKET *Sock)
                        (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
 #endif
 #ifdef HAVE_SYS_UN_H
-               } else if (usock != INVALID_SOCKET && FD_ISSET(usock, &fds)) {
+               } else if (usock != INVALID_SOCKET &&
+#ifdef HAVE_POLL
+                                  ((npfd > 0 && pfd[0].fd == usock && 
pfd[0].revents & POLLIN) ||
+                                       (npfd > 1 && pfd[1].fd == usock && 
pfd[1].revents & POLLIN))
+#else
+                                  FD_ISSET(usock, &fds)
+#endif
+                       ) {
                        struct msghdr msgh;
                        struct iovec iov;
                        char buf[1];
diff --git a/tools/merovingian/daemon/client.c 
b/tools/merovingian/daemon/client.c
--- a/tools/merovingian/daemon/client.c
+++ b/tools/merovingian/daemon/client.c
@@ -14,6 +14,9 @@
 #include <sys/un.h>
 #include <netdb.h>
 #include <netinet/in.h>
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 #ifdef HAVE_SYS_UIO_H
 # include <sys/uio.h>
 #endif
@@ -45,7 +48,7 @@ struct threads {
 };
 struct clientdata {
        int sock;
-       int isusock;
+       bool isusock;
        struct threads *self;
 };
 
@@ -70,7 +73,7 @@ handleClient(void *data)
        sabdb redirs[24];  /* do we need more? */
        int r = 0;
        int sock;
-       char isusock;
+       bool isusock;
        struct threads *self;
 
        sock = ((struct clientdata *) data)->sock;
@@ -413,16 +416,29 @@ acceptConnections(int sock, int usock)
 {
        char *msg;
        int retval;
+#ifdef HAVE_POLL
+       struct pollfd pfd[2];
+#else
        fd_set fds;
+       struct timeval tv;
+#endif
        int msgsock;
        void *e;
-       struct timeval tv;
        struct clientdata *data;
        struct threads *threads = NULL, **threadp, *p;
        int errnr;                                      /* saved errno */
 
        do {
                /* handle socket connections */
+               bool isusock = false;
+
+#ifdef HAVE_POLL
+               pfd[0] = (struct pollfd) {.fd = sock, .events = POLLIN};
+               pfd[1] = (struct pollfd) {.fd = usock, .events = POLLIN};
+
+               /* Wait up to 5 seconds */
+               retval = poll(pfd, 2, 5000);
+#else
                FD_ZERO(&fds);
                FD_SET(sock, &fds);
                FD_SET(usock, &fds);
@@ -432,6 +448,7 @@ acceptConnections(int sock, int usock)
                tv.tv_usec = 0;
                retval = select((sock > usock ? sock : usock) + 1,
                                &fds, NULL, NULL, &tv);
+#endif
                errnr = errno;
                /* join any handleClient threads that we started and that may
                 * have finished by now */
@@ -475,7 +492,14 @@ acceptConnections(int sock, int usock)
                        }
                        continue;
                }
-               if (FD_ISSET(sock, &fds)) {
+               if (
+#ifdef HAVE_POLL
+                       pfd[0].revents & POLLIN
+#else
+                       FD_ISSET(sock, &fds)
+#endif
+                       ) {
+                       isusock = false;
                        if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t *) 
0, SOCK_CLOEXEC)) == -1) {
                                if (_mero_keep_listening == 0)
                                        break;
@@ -501,13 +525,20 @@ acceptConnections(int sock, int usock)
 #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
                        (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
 #endif
-               } else if (FD_ISSET(usock, &fds)) {
+               } else if (
+#ifdef HAVE_POLL
+                       pfd[1].revents & POLLIN
+#else
+                       FD_ISSET(usock, &fds)
+#endif
+                       ) {
                        struct msghdr msgh;
                        struct iovec iov;
                        char buf[1];
                        int rv;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to