Changeset: 9f6babe38ddd for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/9f6babe38ddd Modified Files: clients/mapilib/mapi.c common/stream/socket_stream.c Branch: Aug2024 Log Message:
Implemented interrupt handling on Unix domain sockets using inline data. Unix domain sockets do not support OOB data, so we need a different technique. For Unix domain sockets, we use a special message consisting of two bytes with value 0xFF ('\377') and then the message pay load. Normally, this message should only be sent when the server is executing a query, or when it is reading a new block of data (i.e. waiting for the block size, which cannot have this special value). diffs (truncated from 304 to 300 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -3730,8 +3730,7 @@ mapi_query_abort(MapiHdl hdl, int reason mapi_hdl_check(hdl); mid = hdl->mid; assert(mid->active == NULL || mid->active == hdl); - if (mid->oobintr && !hdl->aborted) { - mnstr_putoob(mid->to, reason); + if (mid->oobintr && !hdl->aborted && mnstr_putoob(mid->to, reason) == 0) { hdl->aborted = true; return MOK; } diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -24,6 +24,152 @@ /* ------------------------------------------------------------------ */ /* streams working on a socket */ +static int +socket_getoob(const stream *s) +{ + SOCKET fd = s->stream_data.s; +#ifdef HAVE_POLL + struct pollfd pfd = (struct pollfd) { + .fd = fd, + .events = POLLPRI, + }; + if (poll(&pfd, 1, 0) > 0) +#else + fd_set fds; + struct timeval t = (struct timeval) { + .tv_sec = 0, + .tv_usec = 0, + }; +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + return 0; +#endif + FD_ZERO(&fds); + FD_SET(fd, &fds); + if (select( +#ifdef _MSC_VER + 0, /* ignored on Windows */ +#else + fd + 1, +#endif + NULL, NULL, &fds, &t) > 0) +#endif + { +#ifdef HAVE_POLL + if (pfd.revents & (POLLHUP | POLLNVAL)) + return -1; + if ((pfd.revents & POLLPRI) == 0) + return -1; +#else + if (!FD_ISSET(fd, &fds)) + return 0; +#endif + /* discard regular data until OOB mark */ + for (;;) { + int atmark = 0; + char flush[100]; + if (ioctl(fd, SIOCATMARK, &atmark) < 0) { + perror("ioctl"); + break; + } + if (atmark) + break; + if (read(fd, flush, sizeof(flush)) < 0) { + perror("read"); + break; + } + } + char b = 0; + switch (recv(fd, &b, 1, MSG_OOB)) { + case 0: + /* unexpectedly didn't receive a byte */ + break; + case 1: + return b; + case -1: + perror("recv OOB"); + return -1; + } + } + return 0; +} + +static int +socket_putoob(const stream *s, char val) +{ + SOCKET fd = s->stream_data.s; + if (send(fd, &val, 1, MSG_OOB) == -1) { + perror("send OOB"); + return -1; + } + return 0; +} + +#ifdef PF_UNIX +/* UNIX domain sockets do not support OOB messages, so we need to do + * something different */ +#define OOBMSG0 '\377' /* the two special bytes we send as "OOB" */ +#define OOBMSG1 '\377' + +static int +socket_getoob_unix(const stream *s) +{ + SOCKET fd = s->stream_data.s; +#ifdef HAVE_POLL + struct pollfd pfd = (struct pollfd) { + .fd = fd, + .events = POLLIN, + }; + if (poll(&pfd, 1, 0) > 0) +#else + fd_set fds; + struct timeval t = (struct timeval) { + .tv_sec = 0, + .tv_usec = 0, + }; +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + return 0; +#endif + FD_ZERO(&fds); + FD_SET(fd, &fds); + if (select( +#ifdef _MSC_VER + 0, /* ignored on Windows */ +#else + fd + 1, +#endif + &fds, NULL, NULL, &t) > 0) +#endif + { + char buf[3]; + ssize_t nr; + nr = recv(fd, buf, 2, MSG_PEEK); + if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) { + nr = recv(fd, buf, 3, 0); + if (nr == 3) + return buf[2]; + } + } + return 0; +} + +static int +socket_putoob_unix(const stream *s, char val) +{ + char buf[3] = { + OOBMSG0, + OOBMSG1, + val, + }; + if (send(s->stream_data.s, buf, 3, 0) == -1) { + perror("send"); + return -1; + } + return 0; +} +#endif + static ssize_t socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) { @@ -127,7 +273,11 @@ socket_read(stream *restrict s, void *re struct pollfd pfd; pfd = (struct pollfd) {.fd = s->stream_data.s, - .events = POLLIN | POLLPRI}; + .events = POLLIN}; +#ifdef PF_UNIX + if (s->putoob != socket_putoob_unix) + pfd.events |= POLLPRI; +#endif ret = poll(&pfd, 1, (int) s->timeout); if (ret == -1 && errno == EINTR) @@ -216,6 +366,20 @@ socket_read(stream *restrict s, void *re return -1; } #endif +#ifdef PF_UNIX + /* when reading a block size in a block stream + * (elmsize==2,cnt==1), we may actually get an "OOB" message + * when this is a Unix domain socket */ + if (s->putoob == socket_putoob_unix && + elmsize == 2 && cnt == 1 && nr == 2 && + ((char *)buf)[0] == OOBMSG0 && + ((char *)buf)[1] == OOBMSG1) { + /* also read (and discard) the "pay load" */ + (void) recv(s->stream_data.s, buf, 1, 0); + mnstr_set_error(s, MNSTR_INTERRUPT, "query abort from client"); + return -1; + } +#endif break; } if (nr == 0) { @@ -335,87 +499,6 @@ socket_isalive(const stream *s) #endif } -static int -socket_getoob(const stream *s) -{ - SOCKET fd = s->stream_data.s; -#ifdef HAVE_POLL - struct pollfd pfd = (struct pollfd) { - .fd = fd, - .events = POLLPRI, - }; - if (poll(&pfd, 1, 0) > 0) -#else - fd_set fds; - struct timeval t = (struct timeval) { - .tv_sec = 0, - .tv_usec = 0, - }; -#ifdef FD_SETSIZE - if (fd >= FD_SETSIZE) - return 0; -#endif - FD_ZERO(&fds); - FD_SET(fd, &fds); - if (select( -#ifdef _MSC_VER - 0, /* ignored on Windows */ -#else - fd + 1, -#endif - NULL, NULL, &fds, &t) > 0) -#endif - { -#ifdef HAVE_POLL - if (pfd.revents & (POLLHUP | POLLNVAL)) - return -1; - if ((pfd.revents & POLLPRI) == 0) - return -1; -#else - if (!FD_ISSET(fd, &fds)) - return 0; -#endif - /* discard regular data until OOB mark */ - for (;;) { - int atmark = 0; - char flush[100]; - if (ioctl(fd, SIOCATMARK, &atmark) < 0) { - perror("ioctl"); - break; - } - if (atmark) - break; - if (read(fd, flush, sizeof(flush)) < 0) { - perror("read"); - break; - } - } - char b = 0; - switch (recv(fd, &b, 1, MSG_OOB)) { - case 0: - /* unexpectedly didn't receive a byte */ - break; - case 1: - return b; - case -1: - perror("recv OOB"); - return -1; - } - } - return 0; -} - -static int -socket_putoob(const stream *s, char val) -{ - SOCKET fd = s->stream_data.s; - if (send(fd, &val, 1, MSG_OOB) == -1) { - perror("send OOB"); - return -1; - } - return 0; -} - static stream * socket_open(SOCKET sock, const char *name) { @@ -448,6 +531,12 @@ socket_open(SOCKET sock, const char *nam domain = AF_INET; /* give it a value if call fails */ } #endif +#ifdef PF_UNIX + if (domain == PF_UNIX) { + s->getoob = socket_getoob_unix; + s->putoob = socket_putoob_unix; + } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org