Changeset: 9f6babe38ddd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9f6babe38ddd
Modified Files:
        clients/mapilib/mapi.c
        common/stream/socket_stream.c
Branch: Aug2024
Log Message:

Implemented interrupt handling on Unix domain sockets using inline data.
Unix domain sockets do not support OOB data, so we need a different
technique.  For Unix domain sockets, we use a special message consisting
of two bytes with value 0xFF ('\377') and then the message pay load.
Normally, this message should only be sent when the server is executing
a query, or when it is reading a new block of data (i.e. waiting for the
block size, which cannot have this special value).


diffs (truncated from 304 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -3730,8 +3730,7 @@ mapi_query_abort(MapiHdl hdl, int reason
        mapi_hdl_check(hdl);
        mid = hdl->mid;
        assert(mid->active == NULL || mid->active == hdl);
-       if (mid->oobintr && !hdl->aborted) {
-               mnstr_putoob(mid->to, reason);
+       if (mid->oobintr && !hdl->aborted && mnstr_putoob(mid->to, reason) == 
0) {
                hdl->aborted = true;
                return MOK;
        }
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -24,6 +24,152 @@
 /* ------------------------------------------------------------------ */
 /* streams working on a socket */
 
+static int
+socket_getoob(const stream *s)
+{
+       SOCKET fd = s->stream_data.s;
+#ifdef HAVE_POLL
+       struct pollfd pfd = (struct pollfd) {
+               .fd = fd,
+               .events = POLLPRI,
+       };
+       if (poll(&pfd, 1, 0) > 0)
+#else
+       fd_set fds;
+       struct timeval t = (struct timeval) {
+               .tv_sec = 0,
+               .tv_usec = 0,
+       };
+#ifdef FD_SETSIZE
+       if (fd >= FD_SETSIZE)
+               return 0;
+#endif
+       FD_ZERO(&fds);
+       FD_SET(fd, &fds);
+       if (select(
+#ifdef _MSC_VER
+                       0,      /* ignored on Windows */
+#else
+                       fd + 1,
+#endif
+                       NULL, NULL, &fds, &t) > 0)
+#endif
+       {
+#ifdef HAVE_POLL
+               if (pfd.revents & (POLLHUP | POLLNVAL))
+                       return -1;
+               if ((pfd.revents & POLLPRI) == 0)
+                       return -1;
+#else
+               if (!FD_ISSET(fd, &fds))
+                       return 0;
+#endif
+               /* discard regular data until OOB mark */
+               for (;;) {
+                       int atmark = 0;
+                       char flush[100];
+                       if (ioctl(fd, SIOCATMARK, &atmark) < 0) {
+                               perror("ioctl");
+                               break;
+                       }
+                       if (atmark)
+                               break;
+                       if (read(fd, flush, sizeof(flush)) < 0) {
+                               perror("read");
+                               break;
+                       }
+               }
+               char b = 0;
+               switch (recv(fd, &b, 1, MSG_OOB)) {
+               case 0:
+                       /* unexpectedly didn't receive a byte */
+                       break;
+               case 1:
+                       return b;
+               case -1:
+                       perror("recv OOB");
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+static int
+socket_putoob(const stream *s, char val)
+{
+       SOCKET fd = s->stream_data.s;
+       if (send(fd, &val, 1, MSG_OOB) == -1) {
+               perror("send OOB");
+               return -1;
+       }
+       return 0;
+}
+
+#ifdef PF_UNIX
+/* UNIX domain sockets do not support OOB messages, so we need to do
+ * something different */
+#define OOBMSG0        '\377'                  /* the two special bytes we 
send as "OOB" */
+#define OOBMSG1        '\377'
+
+static int
+socket_getoob_unix(const stream *s)
+{
+       SOCKET fd = s->stream_data.s;
+#ifdef HAVE_POLL
+       struct pollfd pfd = (struct pollfd) {
+               .fd = fd,
+               .events = POLLIN,
+       };
+       if (poll(&pfd, 1, 0) > 0)
+#else
+       fd_set fds;
+       struct timeval t = (struct timeval) {
+               .tv_sec = 0,
+               .tv_usec = 0,
+       };
+#ifdef FD_SETSIZE
+       if (fd >= FD_SETSIZE)
+               return 0;
+#endif
+       FD_ZERO(&fds);
+       FD_SET(fd, &fds);
+       if (select(
+#ifdef _MSC_VER
+                       0,      /* ignored on Windows */
+#else
+                       fd + 1,
+#endif
+                       &fds, NULL, NULL, &t) > 0)
+#endif
+               {
+                       char buf[3];
+                       ssize_t nr;
+                       nr = recv(fd, buf, 2, MSG_PEEK);
+                       if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) {
+                               nr = recv(fd, buf, 3, 0);
+                               if (nr == 3)
+                                       return buf[2];
+                       }
+               }
+       return 0;
+}
+
+static int
+socket_putoob_unix(const stream *s, char val)
+{
+       char buf[3] = {
+               OOBMSG0,
+               OOBMSG1,
+               val,
+       };
+       if (send(s->stream_data.s, buf, 3, 0) == -1) {
+               perror("send");
+               return -1;
+       }
+       return 0;
+}
+#endif
+
 static ssize_t
 socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, 
size_t cnt)
 {
@@ -127,7 +273,11 @@ socket_read(stream *restrict s, void *re
                        struct pollfd pfd;
 
                        pfd = (struct pollfd) {.fd = s->stream_data.s,
-                                              .events = POLLIN | POLLPRI};
+                                              .events = POLLIN};
+#ifdef PF_UNIX
+                       if (s->putoob != socket_putoob_unix)
+                               pfd.events |= POLLPRI;
+#endif
 
                        ret = poll(&pfd, 1, (int) s->timeout);
                        if (ret == -1 && errno == EINTR)
@@ -216,6 +366,20 @@ socket_read(stream *restrict s, void *re
                        return -1;
                }
 #endif
+#ifdef PF_UNIX
+               /* when reading a block size in a block stream
+                * (elmsize==2,cnt==1), we may actually get an "OOB" message
+                * when this is a Unix domain socket */
+               if (s->putoob == socket_putoob_unix &&
+                       elmsize == 2 && cnt == 1 && nr == 2 &&
+                       ((char *)buf)[0] == OOBMSG0 &&
+                       ((char *)buf)[1] == OOBMSG1) {
+                       /* also read (and discard) the "pay load" */
+                       (void) recv(s->stream_data.s, buf, 1, 0);
+                       mnstr_set_error(s, MNSTR_INTERRUPT, "query abort from 
client");
+                       return -1;
+               }
+#endif
                break;
        }
        if (nr == 0) {
@@ -335,87 +499,6 @@ socket_isalive(const stream *s)
 #endif
 }
 
-static int
-socket_getoob(const stream *s)
-{
-       SOCKET fd = s->stream_data.s;
-#ifdef HAVE_POLL
-       struct pollfd pfd = (struct pollfd) {
-               .fd = fd,
-               .events = POLLPRI,
-       };
-       if (poll(&pfd, 1, 0) > 0)
-#else
-       fd_set fds;
-       struct timeval t = (struct timeval) {
-               .tv_sec = 0,
-               .tv_usec = 0,
-       };
-#ifdef FD_SETSIZE
-       if (fd >= FD_SETSIZE)
-               return 0;
-#endif
-       FD_ZERO(&fds);
-       FD_SET(fd, &fds);
-       if (select(
-#ifdef _MSC_VER
-                       0,      /* ignored on Windows */
-#else
-                       fd + 1,
-#endif
-                       NULL, NULL, &fds, &t) > 0)
-#endif
-       {
-#ifdef HAVE_POLL
-               if (pfd.revents & (POLLHUP | POLLNVAL))
-                       return -1;
-               if ((pfd.revents & POLLPRI) == 0)
-                       return -1;
-#else
-               if (!FD_ISSET(fd, &fds))
-                       return 0;
-#endif
-               /* discard regular data until OOB mark */
-               for (;;) {
-                       int atmark = 0;
-                       char flush[100];
-                       if (ioctl(fd, SIOCATMARK, &atmark) < 0) {
-                               perror("ioctl");
-                               break;
-                       }
-                       if (atmark)
-                               break;
-                       if (read(fd, flush, sizeof(flush)) < 0) {
-                               perror("read");
-                               break;
-                       }
-               }
-               char b = 0;
-               switch (recv(fd, &b, 1, MSG_OOB)) {
-               case 0:
-                       /* unexpectedly didn't receive a byte */
-                       break;
-               case 1:
-                       return b;
-               case -1:
-                       perror("recv OOB");
-                       return -1;
-               }
-       }
-       return 0;
-}
-
-static int
-socket_putoob(const stream *s, char val)
-{
-       SOCKET fd = s->stream_data.s;
-       if (send(fd, &val, 1, MSG_OOB) == -1) {
-               perror("send OOB");
-               return -1;
-       }
-       return 0;
-}
-
 static stream *
 socket_open(SOCKET sock, const char *name)
 {
@@ -448,6 +531,12 @@ socket_open(SOCKET sock, const char *nam
                        domain = AF_INET;       /* give it a value if call 
fails */
        }
 #endif
+#ifdef PF_UNIX
+       if (domain == PF_UNIX) {
+               s->getoob = socket_getoob_unix;
+               s->putoob = socket_putoob_unix;
+       }
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to