Changeset: e5b1de7dc9a2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/e5b1de7dc9a2
Modified Files:
        clients/Tests/exports.stable.out
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        common/stream/socket_stream.c
        common/stream/stream.c
        common/stream/stream.h
        common/stream/stream_internal.h
        common/stream/winio.c
        monetdb5/mal/mal_interpreter.c
        monetdb5/modules/mal/mal_mapi.c
Branch: Aug2024
Log Message:

Got interrupting queries to work on Windows.
Some of the issues encountered:
- the signal handler reverts to the default (i.e. kill process) when a
signal is delivered, so we need to reinstate it;
- the signal handler is called in a thread specially created for the
purpose, and a call to ReadConsole that is being executed at the same
time returns with no error indication after having read 0 bytes, so
this is indistinguishable from end-of-file, and no synchronization is
possible (we just randomly sleep 0.1 second so that the interrupt
handler gets a chance);
- a call to recv at the time of interrupt is not terminated, so we need
to use select with timeouts to check for interrupts (we needed to change
some interfaces to make this possible);
- supposedly we should use the SIOCATMARK ioctl (function ioctlsocket
instead of ioctl), but that never indicates we're at the OOB mark, so we
needed to skip that.


diffs (truncated from 463 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -717,6 +717,7 @@ MapiMsg mapi_seek_row(MapiHdl hdl, int64
 MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1)));
 MapiMsg mapi_setAutocommit(Mapi mid, bool autocommit) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_columnar_protocol(Mapi mid, bool columnar_protocol) 
__attribute__((__nonnull__(1)));
+MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data) __attribute__((__nonnull__(1)));
 MapiMsg mapi_set_size_header(Mapi mid, bool value) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_time_zone(Mapi mid, int seconds_east_of_utc) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void 
*), void *callback_data) __attribute__((__nonnull__(1)));
@@ -1738,6 +1739,7 @@ int mnstr_readStr(stream *restrict s, ch
 ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t 
elmsize, size_t cnt);
 ssize_t mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt);
 void mnstr_set_bigendian(stream *s, bool bigendian);
+void mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...) 
__attribute__((__format__(__printf__, 3, 4)));
 void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void 
*data);
 const char *mnstr_version(void);
 ssize_t mnstr_write(stream *restrict s, const void *restrict buf, size_t 
elmsize, size_t cnt);
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -25,6 +25,7 @@
 #  include "getopt.h"
 # endif
 #endif
+#include "stream.h"
 #include "mapi.h"
 #include <unistd.h>
 #include <string.h>
@@ -38,7 +39,6 @@
 #include <readline/history.h>
 #include "ReadlineTools.h"
 #endif
-#include "stream.h"
 #include "msqldump.h"
 #define LIBMUTILS 1
 #include "mprompt.h"
@@ -1308,6 +1308,10 @@ sigint_handler(int signum)
        (void) signum;
 
        state = INTERRUPT;
+#ifndef HAVE_SIGACTION
+       if (signal(signum, sigint_handler) == SIG_ERR)
+               perror("Could not reinstall sigal handler");
+#endif
 #ifdef HAVE_LIBREADLINE
        readline_int_handler();
 #endif
@@ -2289,7 +2293,7 @@ doFile(Mapi mid, stream *fp, bool useins
                        char *newbuf;
                        state = READING;
                        l = mnstr_readline(fp, buf + length, bufsiz - length);
-                       if (l == -1 && state == INTERRUPT) {
+                       if (l <= 0 && state == INTERRUPT) {
                                /* we were interrupted */
                                mnstr_clearerr(fp);
                                mnstr_write(toConsole, "\n", 1, 1);
@@ -3293,8 +3297,19 @@ isfile(FILE *fp)
        return true;
 }
 
+static bool
+interrupted(void *m)
+{
+       Mapi mid = m;
+       if (state == INTERRUPT) {
+               mnstr_set_error(mapi_get_from(mid), MNSTR_INTERRUPT, NULL);
+               return true;
+       }
+       return false;
+}
+
 static void
-catch_interrupts(void)
+catch_interrupts(Mapi mid)
 {
 #ifdef HAVE_SIGACTION
        struct sigaction sa;
@@ -3309,6 +3324,7 @@ catch_interrupts(void)
                perror("Could not install signal handler");
        }
 #endif
+       mapi_set_rtimeout(mid, 100, interrupted, mid);
 }
 
 int
@@ -3743,7 +3759,7 @@ main(int argc, char **argv)
        if (!has_fileargs && command == NULL && isatty(fileno(stdin))) {
                char *lang;
 
-               catch_interrupts();
+               catch_interrupts(mid);
 
                if (mode == SQL) {
                        lang = "/SQL";
@@ -3843,7 +3859,7 @@ main(int argc, char **argv)
 
                        if (s == NULL) {
                                if (strcmp(arg, "-") == 0) {
-                                       catch_interrupts();
+                                       catch_interrupts(mid);
                                        s = stdin_rastream();
                                } else {
                                        s = open_rastream(arg);
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2401,6 +2401,16 @@ prepareQuery(MapiHdl hdl, const char *cm
 
 
 MapiMsg
+mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), 
void *callback_data)
+{
+       mapi_check(mid);
+       if (mid->trace)
+               printf("Set timeout to %u\n", timeout);
+       mnstr_settimeout(mid->from, timeout, callback, callback_data);
+       return MOK;
+}
+
+MapiMsg
 mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), 
void *callback_data)
 {
        mapi_check(mid);
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -208,6 +208,8 @@ mapi_export MapiMsg mapi_cache_freeup(Ma
 mapi_export MapiMsg mapi_seek_row(MapiHdl hdl, int64_t rowne, int whence)
        __attribute__((__nonnull__(1)));
 
+mapi_export MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data)
+       __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data)
        __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_timeout(Mapi mid, unsigned int time)
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -37,24 +37,26 @@ socket_getoob(const stream *s)
        };
        if (poll(&pfd, 1, 0) > 0)
 #else
-       fd_set fds;
+       fd_set xfds;
        struct timeval t = (struct timeval) {
                .tv_sec = 0,
                .tv_usec = 0,
        };
+#ifndef _MSC_VER
 #ifdef FD_SETSIZE
        if (fd >= FD_SETSIZE)
                return 0;
 #endif
-       FD_ZERO(&fds);
-       FD_SET(fd, &fds);
+#endif
+       FD_ZERO(&xfds);
+       FD_SET(fd, &xfds);
        if (select(
 #ifdef _MSC_VER
                        0,      /* ignored on Windows */
 #else
                        fd + 1,
 #endif
-                       NULL, NULL, &fds, &t) > 0)
+                       NULL, NULL, &xfds, &t) > 0)
 #endif
        {
 #ifdef HAVE_POLL
@@ -63,10 +65,11 @@ socket_getoob(const stream *s)
                if ((pfd.revents & POLLPRI) == 0)
                        return -1;
 #else
-               if (!FD_ISSET(fd, &fds))
+               if (!FD_ISSET(fd, &xfds))
                        return 0;
 #endif
                /* discard regular data until OOB mark */
+#ifndef _MSC_VER                               /* Windows has to be 
different... */
                for (;;) {
                        int atmark = 0;
                        char flush[100];
@@ -81,6 +84,7 @@ socket_getoob(const stream *s)
                                break;
                        }
                }
+#endif
                char b = 0;
                switch (recv(fd, &b, 1, MSG_OOB)) {
                case 0:
@@ -107,7 +111,7 @@ socket_putoob(const stream *s, char val)
        return 0;
 }
 
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
 /* UNIX domain sockets do not support OOB messages, so we need to do
  * something different */
 #define OOBMSG0        '\377'                  /* the two special bytes we 
send as "OOB" */
@@ -129,10 +133,12 @@ socket_getoob_unix(const stream *s)
                .tv_sec = 0,
                .tv_usec = 0,
        };
+#ifndef _MSC_VER
 #ifdef FD_SETSIZE
        if (fd >= FD_SETSIZE)
                return 0;
 #endif
+#endif
        FD_ZERO(&fds);
        FD_SET(fd, &fds);
        if (select(
@@ -176,7 +182,7 @@ static ssize_t
 socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, 
size_t cnt)
 {
        size_t size = elmsize * cnt, res = 0;
-#ifdef NATIVE_WIN32
+#ifdef _MSC_VER
        int nr = 0;
 #else
        ssize_t nr = 0;
@@ -191,12 +197,14 @@ socket_write(stream *restrict s, const v
        errno = 0;
        while (res < size &&
               (
-#ifdef NATIVE_WIN32
-                      /* send works on int, make sure the argument fits */
-                      ((nr = send(s->stream_data.s, (const char *) buf + res, 
(int) min(size - res, 1 << 16), 0)) > 0)
+                      /* Windows send works on int, make sure the argument 
fits */
+                      ((nr = send(s->stream_data.s, (const char *) buf + res,
+#ifdef _MSC_VER
+                                                  (int) min(size - res, 1 << 
16)
 #else
-                      ((nr = write(s->stream_data.s, (const char *) buf + res, 
size - res)) > 0)
+                                                  size
 #endif
+                                                  , 0)) > 0)
                       || (nr < 0 &&    /* syscall failed */
                           s->timeout > 0 &&    /* potentially timeout */
 #ifdef _MSC_VER
@@ -210,7 +218,7 @@ socket_write(stream *restrict s, const v
 #endif
                           s->timeout_func != NULL &&   /* callback function 
exists */
                           !s->timeout_func(s->timeout_data))   /* callback 
says don't stop */
-                      ||(nr < 0 &&
+                      || (nr < 0 &&
 #ifdef _MSC_VER
                          WSAGetLastError() == WSAEINTR
 #else
@@ -275,7 +283,7 @@ socket_read(stream *restrict s, void *re
 
                        pfd = (struct pollfd) {.fd = s->stream_data.s,
                                               .events = POLLIN};
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
                        if (s->putoob != socket_putoob_unix)
                                pfd.events |= POLLPRI;
 #endif
@@ -318,7 +326,7 @@ socket_read(stream *restrict s, void *re
                        }
 #else
                        struct timeval tv;
-                       fd_set fds;
+                       fd_set fds, xfds;
 
                        errno = 0;
 #ifdef _MSC_VER
@@ -326,6 +334,8 @@ socket_read(stream *restrict s, void *re
 #endif
                        FD_ZERO(&fds);
                        FD_SET(s->stream_data.s, &fds);
+                       FD_ZERO(&xfds);
+                       FD_SET(s->stream_data.s, &xfds);
                        tv.tv_sec = s->timeout / 1000;
                        tv.tv_usec = (s->timeout % 1000) * 1000;
                        ret = select(
@@ -334,11 +344,43 @@ socket_read(stream *restrict s, void *re
 #else
                                s->stream_data.s + 1,
 #endif
-                               &fds, NULL, NULL, &tv);
+                               &fds, NULL, &xfds, &tv);
                        if (ret == SOCKET_ERROR) {
                                mnstr_set_error_errno(s, MNSTR_READ_ERROR, 
"select");
                                return -1;
                        }
+                       if (ret > 0 && FD_ISSET(s->stream_data.s, &xfds)) {
+                               /* discard regular data until OOB mark */
+#ifndef _MSC_VER                               /* Windows has to be 
different... */
+                               for (;;) {
+                                       int atmark = 0;
+                                       char flush[100];
+                                       if (ioctlsocket(s->stream_data.s, 
SIOCATMARK, &atmark) < 0) {
+                                               perror("ioctl");
+                                               break;
+                                       }
+                                       if (atmark)
+                                               break;
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to