Changeset: fc2322735185 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/fc2322735185 Modified Files: gdk/gdk.h gdk/gdk_join.c Branch: default Log Message:
Merge with Aug2024 branch. diffs (truncated from 630 to 300 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -3730,8 +3730,7 @@ mapi_query_abort(MapiHdl hdl, int reason mapi_hdl_check(hdl); mid = hdl->mid; assert(mid->active == NULL || mid->active == hdl); - if (mid->oobintr && !hdl->aborted) { - mnstr_putoob(mid->to, reason); + if (mid->oobintr && !hdl->aborted && mnstr_putoob(mid->to, reason) == 0) { hdl->aborted = true; return MOK; } diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -18,11 +18,160 @@ #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #endif +#ifdef HAVE_SYS_IOCTL_H +#include <sys/ioctl.h> +#endif /* ------------------------------------------------------------------ */ /* streams working on a socket */ +static int +socket_getoob(const stream *s) +{ + SOCKET fd = s->stream_data.s; +#ifdef HAVE_POLL + struct pollfd pfd = (struct pollfd) { + .fd = fd, + .events = POLLPRI, + }; + if (poll(&pfd, 1, 0) > 0) +#else + fd_set fds; + struct timeval t = (struct timeval) { + .tv_sec = 0, + .tv_usec = 0, + }; +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + return 0; +#endif + FD_ZERO(&fds); + FD_SET(fd, &fds); + if (select( +#ifdef _MSC_VER + 0, /* ignored on Windows */ +#else + fd + 1, +#endif + NULL, NULL, &fds, &t) > 0) +#endif + { +#ifdef HAVE_POLL + if (pfd.revents & (POLLHUP | POLLNVAL)) + return -1; + if ((pfd.revents & POLLPRI) == 0) + return -1; +#else + if (!FD_ISSET(fd, &fds)) + return 0; +#endif + /* discard regular data until OOB mark */ + for (;;) { + int atmark = 0; + char flush[100]; + if (ioctlsocket(fd, SIOCATMARK, &atmark) < 0) { + perror("ioctl"); + break; + } + if (atmark) + break; + if (recv(fd, flush, sizeof(flush), 0) < 0) { + perror("recv"); + break; + } + } + char b = 0; + switch (recv(fd, &b, 1, MSG_OOB)) { + case 0: + /* unexpectedly didn't receive a byte */ + break; + case 1: + return b; + case -1: + perror("recv OOB"); + return -1; + } + } + return 0; +} + +static int +socket_putoob(const stream *s, char val) +{ + SOCKET fd = s->stream_data.s; + if (send(fd, &val, 1, MSG_OOB) == -1) { + perror("send OOB"); + return -1; + } + return 0; +} + +#ifdef AF_UNIX +/* UNIX domain sockets do not support OOB messages, so we need to do + * something different */ +#define OOBMSG0 '\377' /* the two special bytes we send as "OOB" */ +#define OOBMSG1 '\377' + +static int +socket_getoob_unix(const stream *s) +{ + SOCKET fd = s->stream_data.s; +#ifdef HAVE_POLL + struct pollfd pfd = (struct pollfd) { + .fd = fd, + .events = POLLIN, + }; + if (poll(&pfd, 1, 0) > 0) +#else + fd_set fds; + struct timeval t = (struct timeval) { + .tv_sec = 0, + .tv_usec = 0, + }; +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + return 0; +#endif + FD_ZERO(&fds); + FD_SET(fd, &fds); + if (select( +#ifdef _MSC_VER + 0, /* ignored on Windows */ +#else + fd + 1, +#endif + &fds, NULL, NULL, &t) > 0) +#endif + { + char buf[3]; + ssize_t nr; + nr = recv(fd, buf, 2, MSG_PEEK); + if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) { + nr = recv(fd, buf, 3, 0); + if (nr == 3) + return buf[2]; + } + } + return 0; +} + +static int +socket_putoob_unix(const stream *s, char val) +{ + char buf[3] = { + OOBMSG0, + OOBMSG1, + val, + }; + if (send(s->stream_data.s, buf, 3, 0) == -1) { + perror("send"); + return -1; + } + return 0; +} +#endif + static ssize_t socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) { @@ -103,22 +252,21 @@ socket_read(stream *restrict s, void *re { #ifdef _MSC_VER int nr = 0; + int size; + if (elmsize * cnt > INT_MAX) + size = (int) (elmsize * (INT_MAX / elmsize)); + else + size = (int) (elmsize * cnt); #else ssize_t nr = 0; + size_t size = elmsize * cnt; #endif - size_t size = elmsize * cnt; if (s->errkind != MNSTR_NO__ERROR) return -1; if (size == 0) return 0; -#ifdef _MSC_VER - /* recv only takes an int parameter, and read does not accept - * sockets */ - if (size > INT_MAX) - size = elmsize * (INT_MAX / elmsize); -#endif for (;;) { if (s->timeout) { int ret; @@ -126,7 +274,11 @@ socket_read(stream *restrict s, void *re struct pollfd pfd; pfd = (struct pollfd) {.fd = s->stream_data.s, - .events = POLLIN | POLLPRI}; + .events = POLLIN}; +#ifdef AF_UNIX + if (s->putoob != socket_putoob_unix) + pfd.events |= POLLPRI; +#endif ret = poll(&pfd, 1, (int) s->timeout); if (ret == -1 && errno == EINTR) @@ -136,6 +288,21 @@ socket_read(stream *restrict s, void *re return -1; } if (ret == 1 && pfd.revents & POLLPRI) { + /* discard regular data until OOB mark */ + for (;;) { + int atmark = 0; + char flush[100]; + if (ioctlsocket(s->stream_data.s, SIOCATMARK, &atmark) < 0) { + perror("ioctl"); + break; + } + if (atmark) + break; + if (recv(s->stream_data.s, flush, sizeof(flush), 0) < 0) { + perror("recv"); + break; + } + } char b = 0; switch (recv(s->stream_data.s, &b, 1, MSG_OOB)) { case 0: @@ -187,16 +354,22 @@ socket_read(stream *restrict s, void *re assert(FD_ISSET(s->stream_data.s, &fds)); #endif } -#ifdef _MSC_VER - nr = recv(s->stream_data.s, buf, (int) size, 0); + nr = recv(s->stream_data.s, buf, size, 0); if (nr == SOCKET_ERROR) { - mnstr_set_error_errno(s, MNSTR_READ_ERROR, "recv"); + mnstr_set_error_errno(s, errno == EINTR ? MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL); return -1; } -#else - nr = read(s->stream_data.s, buf, size); - if (nr == -1) { - mnstr_set_error_errno(s, errno == EINTR ? MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL); +#ifdef AF_UNIX + /* when reading a block size in a block stream + * (elmsize==2,cnt==1), we may actually get an "OOB" message + * when this is a Unix domain socket */ + if (s->putoob == socket_putoob_unix && + elmsize == 2 && cnt == 1 && nr == 2 && + ((char *)buf)[0] == OOBMSG0 && + ((char *)buf)[1] == OOBMSG1) { + /* also read (and discard) the "pay load" */ + (void) recv(s->stream_data.s, buf, 1, 0); + mnstr_set_error(s, MNSTR_INTERRUPT, "query abort from client"); return -1; } #endif @@ -319,72 +492,6 @@ socket_isalive(const stream *s) #endif } -static int -socket_getoob(const stream *s) -{ - SOCKET fd = s->stream_data.s; -#ifdef HAVE_POLL - struct pollfd pfd = (struct pollfd) { - .fd = fd, - .events = POLLPRI, - }; - if (poll(&pfd, 1, 0) > 0) -#else - fd_set fds; - struct timeval t = (struct timeval) { - .tv_sec = 0, - .tv_usec = 0, - }; -#ifdef FD_SETSIZE - if (fd >= FD_SETSIZE) - return 0; -#endif - FD_ZERO(&fds); - FD_SET(fd, &fds); - if (select( -#ifdef _MSC_VER - 0, /* ignored on Windows */ _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org