Changeset: 475af50e54c4 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=475af50e54c4
Branch: Oct2020
Log Message:

merged


diffs (truncated from 707 to 300 lines):

diff --git a/common/stream/Tests/All b/common/stream/Tests/All
--- a/common/stream/Tests/All
+++ b/common/stream/Tests/All
@@ -1,13 +1,13 @@
 read_uncompressed
 HAVE_LIBBZ2?read_bz2
 HAVE_LIBZ?read_gz
-HAVE_LIBLZ4?read_lz4
+HAVE_LIBLZ4&HAVE_PYTHON_LZ4?read_lz4
 HAVE_LIBLZMA?read_xz
 
 write_uncompressed
 HAVE_LIBBZ2?write_bz2
 HAVE_LIBZ?write_gz
-HAVE_LIBLZ4?write_lz4
+HAVE_LIBLZ4&HAVE_PYTHON_LZ4?write_lz4
 HAVE_LIBLZMA?write_xz
 
 urlstream
diff --git a/common/stream/Tests/testdata.py b/common/stream/Tests/testdata.py
--- a/common/stream/Tests/testdata.py
+++ b/common/stream/Tests/testdata.py
@@ -2,10 +2,7 @@
 
 # Generating test files and verifying them after transmission.
 
-import bz2
 import gzip
-import lz4.frame
-import lzma
 
 import os
 import sys
@@ -208,10 +205,13 @@ class TestFile:
         elif self.compression == 'gz':
             f = gzip.GzipFile(filename, 'wb', fileobj=fileobj, 
mtime=131875200, compresslevel=1)
         elif self.compression == 'bz2':
+            import bz2
             f = bz2.BZ2File(fileobj, 'wb', compresslevel=1)
         elif self.compression == 'xz':
+            import lzma
             f = lzma.LZMAFile(fileobj, 'wb', preset=1)
         elif self.compression == 'lz4': # ok
+            import lz4.frame
             f = lz4.frame.LZ4FrameFile(fileobj, 'wb', compression_level=1)
         else:
             raise Exception("Unknown compression scheme: " + self.compression)
@@ -225,10 +225,13 @@ class TestFile:
         elif self.compression == 'gz':
             f = gzip.GzipFile(filename, 'rb', mtime=131875200)
         elif self.compression == 'bz2':
+            import bz2
             f = bz2.BZ2File(filename, 'rb')
         elif self.compression == 'xz':
+            import lzma
             f = lzma.LZMAFile(filename, 'rb')
         elif self.compression == 'lz4':
+            import lz4.frame
             f = lz4.frame.LZ4FrameFile(filename, 'rb')
         else:
             raise Exception("Unknown compression scheme: " + self.compression)
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -272,60 +272,51 @@ static ATOMIC_TYPE threadno = ATOMIC_VAR
 static void
 SERVERlistenThread(SOCKET *Sock)
 {
-       char *msg = 0;
+       char *msg = NULL;
        int retval;
-       SOCKET sock = Sock[0];
-       SOCKET usock = Sock[1];
-       SOCKET msgsock = INVALID_SOCKET;
+       SOCKET socks[3] = {Sock[0], Sock[1], Sock[2]};
        struct challengedata *data;
        MT_Id tid;
        stream *s;
+       int i;
 
        GDKfree(Sock);
 
        (void) ATOMIC_INC(&nlistener);
 
        do {
+               SOCKET msgsock = INVALID_SOCKET;
 #ifdef HAVE_POLL
-               struct pollfd pfd[2];
+               struct pollfd pfd[3];
                nfds_t npfd;
                npfd = 0;
-               if (sock != INVALID_SOCKET)
-                       pfd[npfd++] = (struct pollfd) {.fd = sock, .events = 
POLLIN};
-#ifdef HAVE_SYS_UN_H
-               if (usock != INVALID_SOCKET)
-                       pfd[npfd++] = (struct pollfd) {.fd = usock, .events = 
POLLIN};
-#endif
+               for (i = 0; i < 3; i++) {
+                       if (socks[i] != INVALID_SOCKET)
+                               pfd[npfd++] = (struct pollfd) {.fd = socks[i],
+                                                                               
           .events = POLLIN};
+               }
                /* Wait up to 0.1 seconds (0.01 if testing) */
                retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 100);
                if (retval == -1 && errno == EINTR)
                        continue;
 #else
-               struct timeval tv;
                fd_set fds;
                FD_ZERO(&fds);
-               if (sock != INVALID_SOCKET)
-                       FD_SET(sock, &fds);
-#ifdef HAVE_SYS_UN_H
-               if (usock != INVALID_SOCKET)
-                       FD_SET(usock, &fds);
-#endif
+               /* temporarily use msgsock to record the highest socket fd */
+               for (i = 0; i < 3; i++) {
+                       if (socks[i] != INVALID_SOCKET) {
+                               FD_SET(socks[i], &fds);
+                               if (msgsock == INVALID_SOCKET || socks[i] > 
msgsock)
+                                       msgsock = socks[i];
+                       }
+               }
                /* Wait up to 0.1 seconds (0.01 if testing) */
-               tv = (struct timeval) {
+               struct timeval tv = (struct timeval) {
                        .tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 100000,
                };
 
-               /* temporarily use msgsock to record the larger of sock and 
usock */
-#ifdef _MSC_VER
-               msgsock = 0;                    /* value is ignored on Windows 
*/
-#else
-               msgsock = sock;
-#ifdef HAVE_SYS_UN_H
-               if (usock != INVALID_SOCKET && (sock == INVALID_SOCKET || usock 
> sock))
-                       msgsock = usock;
-#endif
-#endif
-               retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv);
+               retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
+               msgsock = INVALID_SOCKET;
 #endif
                if (ATOMIC_GET(&serverexiting) || GDKexiting())
                        break;
@@ -346,38 +337,45 @@ SERVERlistenThread(SOCKET *Sock)
                        }
                        continue;
                }
-               if (sock != INVALID_SOCKET &&
+               bool isusock = false;
 #ifdef HAVE_POLL
-                       (npfd > 0 && pfd[0].fd == sock && pfd[0].revents & 
POLLIN)
+               for (i = 0; i < (int) npfd; i++) {
+                       if (pfd[i].revents & POLLIN) {
+                               msgsock = pfd[i].fd;
+                               isusock = msgsock == socks[2];
+                               break;
+                       }
+               }
 #else
-                       FD_ISSET(sock, &fds)
+               for (i = 0; i < 3; i++) {
+                       if (socks[i] >= 0 && FD_ISSET(socks[i], &fds)) {
+                               msgsock = socks[i];
+                               isusock = i == 2;
+                               break;
+                       }
+               }
 #endif
-                       ) {
-                       if ((msgsock = accept4(sock, NULL, NULL, SOCK_CLOEXEC)) 
== INVALID_SOCKET) {
-                               if (
+               if (msgsock == INVALID_SOCKET)
+                       continue;
+
+               if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == 
INVALID_SOCKET) {
+                       if (
 #ifdef _MSC_VER
-                                       WSAGetLastError() != WSAEINTR
+                               WSAGetLastError() != WSAEINTR
 #else
-                                       errno != EINTR
+                               errno != EINTR
 #endif
-                                       || !ATOMIC_GET(&serveractive)) {
-                                       msg = "accept failed";
-                                       goto error;
-                               }
-                               continue;
+                               || !ATOMIC_GET(&serveractive)) {
+                               msg = "accept failed";
+                               goto error;
                        }
+                       continue;
+               }
 #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
-                       (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
+               (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
 #endif
 #ifdef HAVE_SYS_UN_H
-               } else if (usock != INVALID_SOCKET &&
-#ifdef HAVE_POLL
-                                  ((npfd > 0 && pfd[0].fd == usock && 
pfd[0].revents & POLLIN) ||
-                                       (npfd > 1 && pfd[1].fd == usock && 
pfd[1].revents & POLLIN))
-#else
-                                  FD_ISSET(usock, &fds)
-#endif
-                       ) {
+               if (isusock) {
                        struct msghdr msgh;
                        struct iovec iov;
                        char buf[1];
@@ -385,23 +383,6 @@ SERVERlistenThread(SOCKET *Sock)
                        char ccmsg[CMSG_SPACE(sizeof(int))];
                        struct cmsghdr *cmsg;
 
-                       if ((msgsock = accept4(usock, NULL, NULL, 
SOCK_CLOEXEC)) == INVALID_SOCKET) {
-                               if (
-#ifdef _MSC_VER
-                                       WSAGetLastError() != WSAEINTR
-#else
-                                       errno != EINTR
-#endif
-                                       ) {
-                                       msg = "accept failed";
-                                       goto error;
-                               }
-                               continue;
-                       }
-#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
-                       (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
-#endif
-
                        /* BEWARE: unix domain sockets have a slightly different
                         * behaviour initialy than normal sockets, because we 
can
                         * send filedescriptors or credentials with them.  To 
do so,
@@ -461,8 +442,6 @@ SERVERlistenThread(SOCKET *Sock)
                                        continue;
                        }
 #endif
-               } else {
-                       continue;
                }
 
                data = GDKmalloc(sizeof(*data));
@@ -511,18 +490,14 @@ SERVERlistenThread(SOCKET *Sock)
                        continue;
                }
        } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
+  error:
        (void) ATOMIC_DEC(&nlistener);
-       if (sock != INVALID_SOCKET)
-               closesocket(sock);
-       if (usock != INVALID_SOCKET)
-               closesocket(usock);
+       for (i = 0; i < 3; i++)
+               if (socks[i] != INVALID_SOCKET)
+                       closesocket(socks[i]);
+       if (msg)
+               TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
        return;
-error:
-       TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
-       if (sock != INVALID_SOCKET)
-               closesocket(sock);
-       if (usock != INVALID_SOCKET)
-               closesocket(usock);
 }
 
 #ifdef _MSC_VER
@@ -542,13 +517,12 @@ start_listen(SOCKET *sockp, int *portp, 
                .ai_socktype = SOCK_STREAM,
                .ai_protocol = IPPROTO_TCP,
        };
-       struct sockaddr_storage addr;
-       SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
        int e = 0;
        int ipv6_vs6only = -1;
        SOCKET sock = INVALID_SOCKET;
        const char *err;
-       *sockp = INVALID_SOCKET;
+       int nsock = 0;
+       sockp[0] = sockp[1] = INVALID_SOCKET;
        host[0] = 0;
        if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
                hints.ai_family = AF_INET6;
@@ -583,110 +557,106 @@ start_listen(SOCKET *sockp, int *portp, 
        }
        char sport[8];          /* max "65535" */
        snprintf(sport, sizeof(sport), "%d", *portp);
-       int check = getaddrinfo(listenaddr, sport, &hints, &result);
-       if (check != 0 && ipv6_vs6only == 0) {
-               /* if IPv6 didn't work and we can use IPv4 as well, try just 
IPv4 */
-               hints.ai_family = AF_INET;
-               ipv6_vs6only = -1;
-               if (listenaddr && strcmp(listenaddr, "::1") == 0)
-                       listenaddr = "127.0.0.1";
-               check = getaddrinfo(listenaddr, sport, &hints, &result);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to