Changeset: e5b1de7dc9a2 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/e5b1de7dc9a2 Modified Files: clients/Tests/exports.stable.out clients/mapiclient/mclient.c clients/mapilib/mapi.c clients/mapilib/mapi.h common/stream/socket_stream.c common/stream/stream.c common/stream/stream.h common/stream/stream_internal.h common/stream/winio.c monetdb5/mal/mal_interpreter.c monetdb5/modules/mal/mal_mapi.c Branch: Aug2024 Log Message:
Got interrupting queries to work on Windows. Some of the issues encountered: - the signal handler reverts to the default (i.e. kill process) when a signal is delivered, so we need to reinstate it; - the signal handler is called in a thread specially created for the purpose, and a call to ReadConsole that is being executed at the same time returns with no error indication after having read 0 bytes, so this is indistinguishable from end-of-file, and no synchronization is possible (we just randomly sleep 0.1 second so that the interrupt handler gets a chance); - a call to recv at the time of interrupt is not terminated, so we need to use select with timeouts to check for interrupts (we needed to change some interfaces to make this possible); - supposedly we should use the SIOCATMARK ioctl (function ioctlsocket instead of ioctl), but that never indicates we're at the OOB mark, so we needed to skip that. diffs (truncated from 463 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -717,6 +717,7 @@ MapiMsg mapi_seek_row(MapiHdl hdl, int64 MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1))); MapiMsg mapi_setAutocommit(Mapi mid, bool autocommit) __attribute__((__nonnull__(1))); MapiMsg mapi_set_columnar_protocol(Mapi mid, bool columnar_protocol) __attribute__((__nonnull__(1))); +MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); MapiMsg mapi_set_size_header(Mapi mid, bool value) __attribute__((__nonnull__(1))); MapiMsg mapi_set_time_zone(Mapi mid, int seconds_east_of_utc) __attribute__((__nonnull__(1))); MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); @@ -1738,6 +1739,7 @@ int mnstr_readStr(stream *restrict s, ch ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt); ssize_t mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt); void mnstr_set_bigendian(stream *s, bool bigendian); +void mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...) __attribute__((__format__(__printf__, 3, 4))); void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data); const char *mnstr_version(void); ssize_t mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt); diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -25,6 +25,7 @@ # include "getopt.h" # endif #endif +#include "stream.h" #include "mapi.h" #include <unistd.h> #include <string.h> @@ -38,7 +39,6 @@ #include <readline/history.h> #include "ReadlineTools.h" #endif -#include "stream.h" #include "msqldump.h" #define LIBMUTILS 1 #include "mprompt.h" @@ -1308,6 +1308,10 @@ sigint_handler(int signum) (void) signum; state = INTERRUPT; +#ifndef HAVE_SIGACTION + if (signal(signum, sigint_handler) == SIG_ERR) + perror("Could not reinstall sigal handler"); +#endif #ifdef HAVE_LIBREADLINE readline_int_handler(); #endif @@ -2289,7 +2293,7 @@ doFile(Mapi mid, stream *fp, bool useins char *newbuf; state = READING; l = mnstr_readline(fp, buf + length, bufsiz - length); - if (l == -1 && state == INTERRUPT) { + if (l <= 0 && state == INTERRUPT) { /* we were interrupted */ mnstr_clearerr(fp); mnstr_write(toConsole, "\n", 1, 1); @@ -3293,8 +3297,19 @@ isfile(FILE *fp) return true; } +static bool +interrupted(void *m) +{ + Mapi mid = m; + if (state == INTERRUPT) { + mnstr_set_error(mapi_get_from(mid), MNSTR_INTERRUPT, NULL); + return true; + } + return false; +} + static void -catch_interrupts(void) +catch_interrupts(Mapi mid) { #ifdef HAVE_SIGACTION struct sigaction sa; @@ -3309,6 +3324,7 @@ catch_interrupts(void) perror("Could not install signal handler"); } #endif + mapi_set_rtimeout(mid, 100, interrupted, mid); } int @@ -3743,7 +3759,7 @@ main(int argc, char **argv) if (!has_fileargs && command == NULL && isatty(fileno(stdin))) { char *lang; - catch_interrupts(); + catch_interrupts(mid); if (mode == SQL) { lang = "/SQL"; @@ -3843,7 +3859,7 @@ main(int argc, char **argv) if (s == NULL) { if (strcmp(arg, "-") == 0) { - catch_interrupts(); + catch_interrupts(mid); s = stdin_rastream(); } else { s = open_rastream(arg); diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2401,6 +2401,16 @@ prepareQuery(MapiHdl hdl, const char *cm MapiMsg +mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) +{ + mapi_check(mid); + if (mid->trace) + printf("Set timeout to %u\n", timeout); + mnstr_settimeout(mid->from, timeout, callback, callback_data); + return MOK; +} + +MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) { mapi_check(mid); diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -208,6 +208,8 @@ mapi_export MapiMsg mapi_cache_freeup(Ma mapi_export MapiMsg mapi_seek_row(MapiHdl hdl, int64_t rowne, int whence) __attribute__((__nonnull__(1))); +mapi_export MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) + __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_timeout(Mapi mid, unsigned int time) diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -37,24 +37,26 @@ socket_getoob(const stream *s) }; if (poll(&pfd, 1, 0) > 0) #else - fd_set fds; + fd_set xfds; struct timeval t = (struct timeval) { .tv_sec = 0, .tv_usec = 0, }; +#ifndef _MSC_VER #ifdef FD_SETSIZE if (fd >= FD_SETSIZE) return 0; #endif - FD_ZERO(&fds); - FD_SET(fd, &fds); +#endif + FD_ZERO(&xfds); + FD_SET(fd, &xfds); if (select( #ifdef _MSC_VER 0, /* ignored on Windows */ #else fd + 1, #endif - NULL, NULL, &fds, &t) > 0) + NULL, NULL, &xfds, &t) > 0) #endif { #ifdef HAVE_POLL @@ -63,10 +65,11 @@ socket_getoob(const stream *s) if ((pfd.revents & POLLPRI) == 0) return -1; #else - if (!FD_ISSET(fd, &fds)) + if (!FD_ISSET(fd, &xfds)) return 0; #endif /* discard regular data until OOB mark */ +#ifndef _MSC_VER /* Windows has to be different... */ for (;;) { int atmark = 0; char flush[100]; @@ -81,6 +84,7 @@ socket_getoob(const stream *s) break; } } +#endif char b = 0; switch (recv(fd, &b, 1, MSG_OOB)) { case 0: @@ -107,7 +111,7 @@ socket_putoob(const stream *s, char val) return 0; } -#ifdef AF_UNIX +#ifdef HAVE_SYS_UN_H /* UNIX domain sockets do not support OOB messages, so we need to do * something different */ #define OOBMSG0 '\377' /* the two special bytes we send as "OOB" */ @@ -129,10 +133,12 @@ socket_getoob_unix(const stream *s) .tv_sec = 0, .tv_usec = 0, }; +#ifndef _MSC_VER #ifdef FD_SETSIZE if (fd >= FD_SETSIZE) return 0; #endif +#endif FD_ZERO(&fds); FD_SET(fd, &fds); if (select( @@ -176,7 +182,7 @@ static ssize_t socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) { size_t size = elmsize * cnt, res = 0; -#ifdef NATIVE_WIN32 +#ifdef _MSC_VER int nr = 0; #else ssize_t nr = 0; @@ -191,12 +197,14 @@ socket_write(stream *restrict s, const v errno = 0; while (res < size && ( -#ifdef NATIVE_WIN32 - /* send works on int, make sure the argument fits */ - ((nr = send(s->stream_data.s, (const char *) buf + res, (int) min(size - res, 1 << 16), 0)) > 0) + /* Windows send works on int, make sure the argument fits */ + ((nr = send(s->stream_data.s, (const char *) buf + res, +#ifdef _MSC_VER + (int) min(size - res, 1 << 16) #else - ((nr = write(s->stream_data.s, (const char *) buf + res, size - res)) > 0) + size #endif + , 0)) > 0) || (nr < 0 && /* syscall failed */ s->timeout > 0 && /* potentially timeout */ #ifdef _MSC_VER @@ -210,7 +218,7 @@ socket_write(stream *restrict s, const v #endif s->timeout_func != NULL && /* callback function exists */ !s->timeout_func(s->timeout_data)) /* callback says don't stop */ - ||(nr < 0 && + || (nr < 0 && #ifdef _MSC_VER WSAGetLastError() == WSAEINTR #else @@ -275,7 +283,7 @@ socket_read(stream *restrict s, void *re pfd = (struct pollfd) {.fd = s->stream_data.s, .events = POLLIN}; -#ifdef AF_UNIX +#ifdef HAVE_SYS_UN_H if (s->putoob != socket_putoob_unix) pfd.events |= POLLPRI; #endif @@ -318,7 +326,7 @@ socket_read(stream *restrict s, void *re } #else struct timeval tv; - fd_set fds; + fd_set fds, xfds; errno = 0; #ifdef _MSC_VER @@ -326,6 +334,8 @@ socket_read(stream *restrict s, void *re #endif FD_ZERO(&fds); FD_SET(s->stream_data.s, &fds); + FD_ZERO(&xfds); + FD_SET(s->stream_data.s, &xfds); tv.tv_sec = s->timeout / 1000; tv.tv_usec = (s->timeout % 1000) * 1000; ret = select( @@ -334,11 +344,43 @@ socket_read(stream *restrict s, void *re #else s->stream_data.s + 1, #endif - &fds, NULL, NULL, &tv); + &fds, NULL, &xfds, &tv); if (ret == SOCKET_ERROR) { mnstr_set_error_errno(s, MNSTR_READ_ERROR, "select"); return -1; } + if (ret > 0 && FD_ISSET(s->stream_data.s, &xfds)) { + /* discard regular data until OOB mark */ +#ifndef _MSC_VER /* Windows has to be different... */ + for (;;) { + int atmark = 0; + char flush[100]; + if (ioctlsocket(s->stream_data.s, SIOCATMARK, &atmark) < 0) { + perror("ioctl"); + break; + } + if (atmark) + break; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org