Changeset: 11f022e6b68c for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=11f022e6b68c Modified Files: common/stream/stream.c configure.ag monetdb5/modules/mal/mal_mapi.c tools/merovingian/daemon/client.c tools/merovingian/daemon/controlrunner.c tools/merovingian/daemon/discoveryrunner.c tools/merovingian/daemon/merovingian.c tools/merovingian/daemon/multiplex-funnel.c Branch: Apr2019 Log Message:
Use poll() instead of select() when available. This works around a limitation in the implementation of the FD_SET set of macros on Linux which limits the file descriptors that can be used to FD_SETSIZE (1024). Windows doesn't have this limitation, nor does poll(). diffs (truncated from 679 to 300 lines): diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -72,6 +72,9 @@ # include <netinet/tcp.h> # include <netdb.h> #endif +#ifdef HAVE_POLL_H +#include <poll.h> +#endif #ifdef NATIVE_WIN32 #include <io.h> @@ -2492,9 +2495,21 @@ socket_read(stream *restrict s, void *re #endif for (;;) { if (s->timeout) { + int ret; +#ifdef HAVE_POLL + struct pollfd pfd; + + pfd = (struct pollfd) {.fd = s->stream_data.s, + .events = POLLIN}; + + ret = poll(&pfd, 1, (int) s->timeout); + if (ret == -1 || (pfd.revents & POLLERR)) { + s->errnr = MNSTR_READ_ERROR; + return -1; + } +#else struct timeval tv; fd_set fds; - int ret; errno = 0; #ifdef _MSC_VER @@ -2515,6 +2530,7 @@ socket_read(stream *restrict s, void *re s->errnr = MNSTR_READ_ERROR; return -1; } +#endif if (ret == 0) { if (s->timeout_func == NULL || s->timeout_func()) { s->errnr = MNSTR_TIMEOUT; @@ -2523,7 +2539,11 @@ socket_read(stream *restrict s, void *re continue; } assert(ret == 1); +#ifdef HAVE_POLL + assert(pfd.revents & (POLLIN|POLLHUP)); +#else assert(FD_ISSET(s->stream_data.s, &fds)); +#endif } #ifdef _MSC_VER nr = recv(s->stream_data.s, buf, (int) size, 0); @@ -2620,9 +2640,20 @@ static int socket_isalive(stream *s) { SOCKET fd = s->stream_data.s; - char buffer[32]; +#ifdef HAVE_POLL + struct pollfd pfd; + int ret; + pfd = (struct pollfd){.fd = fd}; + if ((ret = poll(&pfd, 1, 0)) == 0) + return 1; + if (ret < 0 || pfd.revents & (POLLERR | POLLHUP)) + return 0; + assert(0); /* unexpected revents value */ + return 0; +#else fd_set fds; struct timeval t; + char buffer[32]; t.tv_sec = 0; t.tv_usec = 0; @@ -2636,6 +2667,7 @@ socket_isalive(stream *s) #endif &fds, NULL, NULL, &t) <= 0 || recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0; +#endif } static stream * diff --git a/configure.ag b/configure.ag --- a/configure.ag +++ b/configure.ag @@ -2396,6 +2396,7 @@ AC_CHECK_HEADERS([ \ mach-o/dyld.h \ netdb.h \ netinet/in.h \ + poll.h \ procfs.h \ pwd.h \ strings.h \ @@ -2605,6 +2606,7 @@ AC_CHECK_FUNCS([\ nl_langinfo \ _NSGetExecutablePath \ pipe2 \ + poll \ popen \ posix_fadvise \ posix_fallocate \ diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c --- a/monetdb5/modules/mal/mal_mapi.c +++ b/monetdb5/modules/mal/mal_mapi.c @@ -58,6 +58,9 @@ # include <netdb.h> # include <netinet/in.h> #endif +#ifdef HAVE_POLL_H +#include <poll.h> +#endif #ifdef HAVE_SYS_UIO_H # include <sys/uio.h> #endif @@ -281,8 +284,13 @@ SERVERlistenThread(SOCKET *Sock) { char *msg = 0; int retval; +#ifdef HAVE_POLL + struct pollfd pfd[2]; + nfds_t npfd; +#else struct timeval tv; fd_set fds; +#endif SOCKET sock = INVALID_SOCKET; SOCKET usock = INVALID_SOCKET; SOCKET msgsock = INVALID_SOCKET; @@ -297,6 +305,17 @@ SERVERlistenThread(SOCKET *Sock) (void) ATOMIC_INC(&nlistener); do { +#ifdef HAVE_POLL + npfd = 0; + if (sock != INVALID_SOCKET) + pfd[npfd++] = (struct pollfd) {.fd = sock, .events = POLLIN}; +#ifdef HAVE_SYS_UN_H + if (usock != INVALID_SOCKET) + pfd[npfd++] = (struct pollfd) {.fd = usock, .events = POLLIN}; +#endif + /* Wait up to 0.025 seconds (0.001 if testing) */ + retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 25); +#else FD_ZERO(&fds); if (sock != INVALID_SOCKET) FD_SET(sock, &fds); @@ -304,7 +323,7 @@ SERVERlistenThread(SOCKET *Sock) if (usock != INVALID_SOCKET) FD_SET(usock, &fds); #endif - /* Wait up to 0.025 seconds (0.01 if testing) */ + /* Wait up to 0.025 seconds (0.001 if testing) */ tv.tv_sec = 0; tv.tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 25000; @@ -315,6 +334,7 @@ SERVERlistenThread(SOCKET *Sock) msgsock = usock; #endif retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv); +#endif if (ATOMIC_GET(&serverexiting) || GDKexiting()) break; if (retval == 0) { @@ -334,7 +354,13 @@ SERVERlistenThread(SOCKET *Sock) } continue; } - if (sock != INVALID_SOCKET && FD_ISSET(sock, &fds)) { + if (sock != INVALID_SOCKET && +#ifdef HAVE_POLL + (npfd > 0 && pfd[0].fd == sock && pfd[0].revents & POLLIN) +#else + FD_ISSET(sock, &fds) +#endif + ) { if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t *)0, SOCK_CLOEXEC)) == INVALID_SOCKET) { if ( #ifdef _MSC_VER @@ -352,7 +378,14 @@ SERVERlistenThread(SOCKET *Sock) (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); #endif #ifdef HAVE_SYS_UN_H - } else if (usock != INVALID_SOCKET && FD_ISSET(usock, &fds)) { + } else if (usock != INVALID_SOCKET && +#ifdef HAVE_POLL + ((npfd > 0 && pfd[0].fd == usock && pfd[0].revents & POLLIN) || + (npfd > 1 && pfd[1].fd == usock && pfd[1].revents & POLLIN)) +#else + FD_ISSET(usock, &fds) +#endif + ) { struct msghdr msgh; struct iovec iov; char buf[1]; diff --git a/tools/merovingian/daemon/client.c b/tools/merovingian/daemon/client.c --- a/tools/merovingian/daemon/client.c +++ b/tools/merovingian/daemon/client.c @@ -14,6 +14,9 @@ #include <sys/un.h> #include <netdb.h> #include <netinet/in.h> +#ifdef HAVE_POLL_H +#include <poll.h> +#endif #ifdef HAVE_SYS_UIO_H # include <sys/uio.h> #endif @@ -45,7 +48,7 @@ struct threads { }; struct clientdata { int sock; - int isusock; + bool isusock; struct threads *self; }; @@ -70,7 +73,7 @@ handleClient(void *data) sabdb redirs[24]; /* do we need more? */ int r = 0; int sock; - char isusock; + bool isusock; struct threads *self; sock = ((struct clientdata *) data)->sock; @@ -413,16 +416,29 @@ acceptConnections(int sock, int usock) { char *msg; int retval; +#ifdef HAVE_POLL + struct pollfd pfd[2]; +#else fd_set fds; + struct timeval tv; +#endif int msgsock; void *e; - struct timeval tv; struct clientdata *data; struct threads *threads = NULL, **threadp, *p; int errnr; /* saved errno */ do { /* handle socket connections */ + bool isusock = false; + +#ifdef HAVE_POLL + pfd[0] = (struct pollfd) {.fd = sock, .events = POLLIN}; + pfd[1] = (struct pollfd) {.fd = usock, .events = POLLIN}; + + /* Wait up to 5 seconds */ + retval = poll(pfd, 2, 5000); +#else FD_ZERO(&fds); FD_SET(sock, &fds); FD_SET(usock, &fds); @@ -432,6 +448,7 @@ acceptConnections(int sock, int usock) tv.tv_usec = 0; retval = select((sock > usock ? sock : usock) + 1, &fds, NULL, NULL, &tv); +#endif errnr = errno; /* join any handleClient threads that we started and that may * have finished by now */ @@ -475,7 +492,14 @@ acceptConnections(int sock, int usock) } continue; } - if (FD_ISSET(sock, &fds)) { + if ( +#ifdef HAVE_POLL + pfd[0].revents & POLLIN +#else + FD_ISSET(sock, &fds) +#endif + ) { + isusock = false; if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t *) 0, SOCK_CLOEXEC)) == -1) { if (_mero_keep_listening == 0) break; @@ -501,13 +525,20 @@ acceptConnections(int sock, int usock) #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4)) (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); #endif - } else if (FD_ISSET(usock, &fds)) { + } else if ( +#ifdef HAVE_POLL + pfd[1].revents & POLLIN +#else + FD_ISSET(usock, &fds) +#endif + ) { struct msghdr msgh; struct iovec iov; char buf[1]; int rv; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list