Changeset: 475af50e54c4 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=475af50e54c4 Branch: Oct2020 Log Message:
merged diffs (truncated from 707 to 300 lines): diff --git a/common/stream/Tests/All b/common/stream/Tests/All --- a/common/stream/Tests/All +++ b/common/stream/Tests/All @@ -1,13 +1,13 @@ read_uncompressed HAVE_LIBBZ2?read_bz2 HAVE_LIBZ?read_gz -HAVE_LIBLZ4?read_lz4 +HAVE_LIBLZ4&HAVE_PYTHON_LZ4?read_lz4 HAVE_LIBLZMA?read_xz write_uncompressed HAVE_LIBBZ2?write_bz2 HAVE_LIBZ?write_gz -HAVE_LIBLZ4?write_lz4 +HAVE_LIBLZ4&HAVE_PYTHON_LZ4?write_lz4 HAVE_LIBLZMA?write_xz urlstream diff --git a/common/stream/Tests/testdata.py b/common/stream/Tests/testdata.py --- a/common/stream/Tests/testdata.py +++ b/common/stream/Tests/testdata.py @@ -2,10 +2,7 @@ # Generating test files and verifying them after transmission. -import bz2 import gzip -import lz4.frame -import lzma import os import sys @@ -208,10 +205,13 @@ class TestFile: elif self.compression == 'gz': f = gzip.GzipFile(filename, 'wb', fileobj=fileobj, mtime=131875200, compresslevel=1) elif self.compression == 'bz2': + import bz2 f = bz2.BZ2File(fileobj, 'wb', compresslevel=1) elif self.compression == 'xz': + import lzma f = lzma.LZMAFile(fileobj, 'wb', preset=1) elif self.compression == 'lz4': # ok + import lz4.frame f = lz4.frame.LZ4FrameFile(fileobj, 'wb', compression_level=1) else: raise Exception("Unknown compression scheme: " + self.compression) @@ -225,10 +225,13 @@ class TestFile: elif self.compression == 'gz': f = gzip.GzipFile(filename, 'rb', mtime=131875200) elif self.compression == 'bz2': + import bz2 f = bz2.BZ2File(filename, 'rb') elif self.compression == 'xz': + import lzma f = lzma.LZMAFile(filename, 'rb') elif self.compression == 'lz4': + import lz4.frame f = lz4.frame.LZ4FrameFile(filename, 'rb') else: raise Exception("Unknown compression scheme: " + self.compression) diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c --- a/monetdb5/modules/mal/mal_mapi.c +++ b/monetdb5/modules/mal/mal_mapi.c @@ -272,60 +272,51 @@ static ATOMIC_TYPE threadno = ATOMIC_VAR static void SERVERlistenThread(SOCKET *Sock) { - char *msg = 0; + char *msg = NULL; int retval; - SOCKET sock = Sock[0]; - SOCKET usock = Sock[1]; - SOCKET msgsock = INVALID_SOCKET; + SOCKET socks[3] = {Sock[0], Sock[1], Sock[2]}; struct challengedata *data; MT_Id tid; stream *s; + int i; GDKfree(Sock); (void) ATOMIC_INC(&nlistener); do { + SOCKET msgsock = INVALID_SOCKET; #ifdef HAVE_POLL - struct pollfd pfd[2]; + struct pollfd pfd[3]; nfds_t npfd; npfd = 0; - if (sock != INVALID_SOCKET) - pfd[npfd++] = (struct pollfd) {.fd = sock, .events = POLLIN}; -#ifdef HAVE_SYS_UN_H - if (usock != INVALID_SOCKET) - pfd[npfd++] = (struct pollfd) {.fd = usock, .events = POLLIN}; -#endif + for (i = 0; i < 3; i++) { + if (socks[i] != INVALID_SOCKET) + pfd[npfd++] = (struct pollfd) {.fd = socks[i], + .events = POLLIN}; + } /* Wait up to 0.1 seconds (0.01 if testing) */ retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 100); if (retval == -1 && errno == EINTR) continue; #else - struct timeval tv; fd_set fds; FD_ZERO(&fds); - if (sock != INVALID_SOCKET) - FD_SET(sock, &fds); -#ifdef HAVE_SYS_UN_H - if (usock != INVALID_SOCKET) - FD_SET(usock, &fds); -#endif + /* temporarily use msgsock to record the highest socket fd */ + for (i = 0; i < 3; i++) { + if (socks[i] != INVALID_SOCKET) { + FD_SET(socks[i], &fds); + if (msgsock == INVALID_SOCKET || socks[i] > msgsock) + msgsock = socks[i]; + } + } /* Wait up to 0.1 seconds (0.01 if testing) */ - tv = (struct timeval) { + struct timeval tv = (struct timeval) { .tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 100000, }; - /* temporarily use msgsock to record the larger of sock and usock */ -#ifdef _MSC_VER - msgsock = 0; /* value is ignored on Windows */ -#else - msgsock = sock; -#ifdef HAVE_SYS_UN_H - if (usock != INVALID_SOCKET && (sock == INVALID_SOCKET || usock > sock)) - msgsock = usock; -#endif -#endif - retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv); + retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv); + msgsock = INVALID_SOCKET; #endif if (ATOMIC_GET(&serverexiting) || GDKexiting()) break; @@ -346,38 +337,45 @@ SERVERlistenThread(SOCKET *Sock) } continue; } - if (sock != INVALID_SOCKET && + bool isusock = false; #ifdef HAVE_POLL - (npfd > 0 && pfd[0].fd == sock && pfd[0].revents & POLLIN) + for (i = 0; i < (int) npfd; i++) { + if (pfd[i].revents & POLLIN) { + msgsock = pfd[i].fd; + isusock = msgsock == socks[2]; + break; + } + } #else - FD_ISSET(sock, &fds) + for (i = 0; i < 3; i++) { + if (socks[i] >= 0 && FD_ISSET(socks[i], &fds)) { + msgsock = socks[i]; + isusock = i == 2; + break; + } + } #endif - ) { - if ((msgsock = accept4(sock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) { - if ( + if (msgsock == INVALID_SOCKET) + continue; + + if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) { + if ( #ifdef _MSC_VER - WSAGetLastError() != WSAEINTR + WSAGetLastError() != WSAEINTR #else - errno != EINTR + errno != EINTR #endif - || !ATOMIC_GET(&serveractive)) { - msg = "accept failed"; - goto error; - } - continue; + || !ATOMIC_GET(&serveractive)) { + msg = "accept failed"; + goto error; } + continue; + } #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4)) - (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); + (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); #endif #ifdef HAVE_SYS_UN_H - } else if (usock != INVALID_SOCKET && -#ifdef HAVE_POLL - ((npfd > 0 && pfd[0].fd == usock && pfd[0].revents & POLLIN) || - (npfd > 1 && pfd[1].fd == usock && pfd[1].revents & POLLIN)) -#else - FD_ISSET(usock, &fds) -#endif - ) { + if (isusock) { struct msghdr msgh; struct iovec iov; char buf[1]; @@ -385,23 +383,6 @@ SERVERlistenThread(SOCKET *Sock) char ccmsg[CMSG_SPACE(sizeof(int))]; struct cmsghdr *cmsg; - if ((msgsock = accept4(usock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) { - if ( -#ifdef _MSC_VER - WSAGetLastError() != WSAEINTR -#else - errno != EINTR -#endif - ) { - msg = "accept failed"; - goto error; - } - continue; - } -#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4)) - (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); -#endif - /* BEWARE: unix domain sockets have a slightly different * behaviour initialy than normal sockets, because we can * send filedescriptors or credentials with them. To do so, @@ -461,8 +442,6 @@ SERVERlistenThread(SOCKET *Sock) continue; } #endif - } else { - continue; } data = GDKmalloc(sizeof(*data)); @@ -511,18 +490,14 @@ SERVERlistenThread(SOCKET *Sock) continue; } } while (!ATOMIC_GET(&serverexiting) && !GDKexiting()); + error: (void) ATOMIC_DEC(&nlistener); - if (sock != INVALID_SOCKET) - closesocket(sock); - if (usock != INVALID_SOCKET) - closesocket(usock); + for (i = 0; i < 3; i++) + if (socks[i] != INVALID_SOCKET) + closesocket(socks[i]); + if (msg) + TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg); return; -error: - TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg); - if (sock != INVALID_SOCKET) - closesocket(sock); - if (usock != INVALID_SOCKET) - closesocket(usock); } #ifdef _MSC_VER @@ -542,13 +517,12 @@ start_listen(SOCKET *sockp, int *portp, .ai_socktype = SOCK_STREAM, .ai_protocol = IPPROTO_TCP, }; - struct sockaddr_storage addr; - SOCKLEN addrlen = (SOCKLEN) sizeof(addr); int e = 0; int ipv6_vs6only = -1; SOCKET sock = INVALID_SOCKET; const char *err; - *sockp = INVALID_SOCKET; + int nsock = 0; + sockp[0] = sockp[1] = INVALID_SOCKET; host[0] = 0; if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) { hints.ai_family = AF_INET6; @@ -583,110 +557,106 @@ start_listen(SOCKET *sockp, int *portp, } char sport[8]; /* max "65535" */ snprintf(sport, sizeof(sport), "%d", *portp); - int check = getaddrinfo(listenaddr, sport, &hints, &result); - if (check != 0 && ipv6_vs6only == 0) { - /* if IPv6 didn't work and we can use IPv4 as well, try just IPv4 */ - hints.ai_family = AF_INET; - ipv6_vs6only = -1; - if (listenaddr && strcmp(listenaddr, "::1") == 0) - listenaddr = "127.0.0.1"; - check = getaddrinfo(listenaddr, sport, &hints, &result); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list