Changeset: 78b4cf3f8581 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/78b4cf3f8581 Modified Files: clients/Tests/exports.stable.out Branch: default Log Message:
Merge with Aug2024 branch. diffs (truncated from 542 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -717,6 +717,7 @@ MapiMsg mapi_seek_row(MapiHdl hdl, int64 MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1))); MapiMsg mapi_setAutocommit(Mapi mid, bool autocommit) __attribute__((__nonnull__(1))); MapiMsg mapi_set_columnar_protocol(Mapi mid, bool columnar_protocol) __attribute__((__nonnull__(1))); +MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); MapiMsg mapi_set_size_header(Mapi mid, bool value) __attribute__((__nonnull__(1))); MapiMsg mapi_set_time_zone(Mapi mid, int seconds_east_of_utc) __attribute__((__nonnull__(1))); MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); @@ -1738,6 +1739,7 @@ int mnstr_readStr(stream *restrict s, ch ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt); ssize_t mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt); void mnstr_set_bigendian(stream *s, bool bigendian); +void mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...) __attribute__((__format__(__printf__, 3, 4))); void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data); const char *mnstr_version(void); ssize_t mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt); diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -25,6 +25,7 @@ # include "getopt.h" # endif #endif +#include "stream.h" #include "mapi.h" #include <unistd.h> #include <string.h> @@ -38,7 +39,6 @@ #include <readline/history.h> #include "ReadlineTools.h" #endif -#include "stream.h" #include "msqldump.h" #define LIBMUTILS 1 #include "mprompt.h" @@ -1308,6 +1308,10 @@ sigint_handler(int signum) (void) signum; state = INTERRUPT; +#ifndef HAVE_SIGACTION + if (signal(signum, sigint_handler) == SIG_ERR) + perror("Could not reinstall sigal handler"); +#endif #ifdef HAVE_LIBREADLINE readline_int_handler(); #endif @@ -2289,7 +2293,7 @@ doFile(Mapi mid, stream *fp, bool useins char *newbuf; state = READING; l = mnstr_readline(fp, buf + length, bufsiz - length); - if (l == -1 && state == INTERRUPT) { + if (l <= 0 && state == INTERRUPT) { /* we were interrupted */ mnstr_clearerr(fp); mnstr_write(toConsole, "\n", 1, 1); @@ -3293,8 +3297,19 @@ isfile(FILE *fp) return true; } +static bool +interrupted(void *m) +{ + Mapi mid = m; + if (state == INTERRUPT) { + mnstr_set_error(mapi_get_from(mid), MNSTR_INTERRUPT, NULL); + return true; + } + return false; +} + static void -catch_interrupts(void) +catch_interrupts(Mapi mid) { #ifdef HAVE_SIGACTION struct sigaction sa; @@ -3309,6 +3324,7 @@ catch_interrupts(void) perror("Could not install signal handler"); } #endif + mapi_set_rtimeout(mid, 100, interrupted, mid); } int @@ -3743,7 +3759,7 @@ main(int argc, char **argv) if (!has_fileargs && command == NULL && isatty(fileno(stdin))) { char *lang; - catch_interrupts(); + catch_interrupts(mid); if (mode == SQL) { lang = "/SQL"; @@ -3843,7 +3859,7 @@ main(int argc, char **argv) if (s == NULL) { if (strcmp(arg, "-") == 0) { - catch_interrupts(); + catch_interrupts(mid); s = stdin_rastream(); } else { s = open_rastream(arg); diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2401,6 +2401,16 @@ prepareQuery(MapiHdl hdl, const char *cm MapiMsg +mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) +{ + mapi_check(mid); + if (mid->trace) + printf("Set timeout to %u\n", timeout); + mnstr_settimeout(mid->from, timeout, callback, callback_data); + return MOK; +} + +MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) { mapi_check(mid); diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -208,6 +208,8 @@ mapi_export MapiMsg mapi_cache_freeup(Ma mapi_export MapiMsg mapi_seek_row(MapiHdl hdl, int64_t rowne, int whence) __attribute__((__nonnull__(1))); +mapi_export MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) + __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), void *callback_data) __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_timeout(Mapi mid, unsigned int time) diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -37,24 +37,26 @@ socket_getoob(const stream *s) }; if (poll(&pfd, 1, 0) > 0) #else - fd_set fds; + fd_set xfds; struct timeval t = (struct timeval) { .tv_sec = 0, .tv_usec = 0, }; +#ifndef _MSC_VER #ifdef FD_SETSIZE if (fd >= FD_SETSIZE) return 0; #endif - FD_ZERO(&fds); - FD_SET(fd, &fds); +#endif + FD_ZERO(&xfds); + FD_SET(fd, &xfds); if (select( #ifdef _MSC_VER 0, /* ignored on Windows */ #else fd + 1, #endif - NULL, NULL, &fds, &t) > 0) + NULL, NULL, &xfds, &t) > 0) #endif { #ifdef HAVE_POLL @@ -63,10 +65,11 @@ socket_getoob(const stream *s) if ((pfd.revents & POLLPRI) == 0) return -1; #else - if (!FD_ISSET(fd, &fds)) + if (!FD_ISSET(fd, &xfds)) return 0; #endif /* discard regular data until OOB mark */ +#ifndef _MSC_VER /* Windows has to be different... */ for (;;) { int atmark = 0; char flush[100]; @@ -81,6 +84,7 @@ socket_getoob(const stream *s) break; } } +#endif char b = 0; switch (recv(fd, &b, 1, MSG_OOB)) { case 0: @@ -107,7 +111,7 @@ socket_putoob(const stream *s, char val) return 0; } -#ifdef AF_UNIX +#ifdef HAVE_SYS_UN_H /* UNIX domain sockets do not support OOB messages, so we need to do * something different */ #define OOBMSG0 '\377' /* the two special bytes we send as "OOB" */ @@ -129,10 +133,12 @@ socket_getoob_unix(const stream *s) .tv_sec = 0, .tv_usec = 0, }; +#ifndef _MSC_VER #ifdef FD_SETSIZE if (fd >= FD_SETSIZE) return 0; #endif +#endif FD_ZERO(&fds); FD_SET(fd, &fds); if (select( @@ -176,7 +182,7 @@ static ssize_t socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) { size_t size = elmsize * cnt, res = 0; -#ifdef NATIVE_WIN32 +#ifdef _MSC_VER int nr = 0; #else ssize_t nr = 0; @@ -191,12 +197,14 @@ socket_write(stream *restrict s, const v errno = 0; while (res < size && ( -#ifdef NATIVE_WIN32 - /* send works on int, make sure the argument fits */ - ((nr = send(s->stream_data.s, (const char *) buf + res, (int) min(size - res, 1 << 16), 0)) > 0) + /* Windows send works on int, make sure the argument fits */ + ((nr = send(s->stream_data.s, (const char *) buf + res, +#ifdef _MSC_VER + (int) min(size - res, 1 << 16) #else - ((nr = write(s->stream_data.s, (const char *) buf + res, size - res)) > 0) + size #endif + , 0)) > 0) || (nr < 0 && /* syscall failed */ s->timeout > 0 && /* potentially timeout */ #ifdef _MSC_VER @@ -210,7 +218,7 @@ socket_write(stream *restrict s, const v #endif s->timeout_func != NULL && /* callback function exists */ !s->timeout_func(s->timeout_data)) /* callback says don't stop */ - ||(nr < 0 && + || (nr < 0 && #ifdef _MSC_VER WSAGetLastError() == WSAEINTR #else @@ -275,20 +283,82 @@ socket_read(stream *restrict s, void *re pfd = (struct pollfd) {.fd = s->stream_data.s, .events = POLLIN}; -#ifdef AF_UNIX +#ifdef HAVE_SYS_UN_H if (s->putoob != socket_putoob_unix) pfd.events |= POLLPRI; #endif ret = poll(&pfd, 1, (int) s->timeout); - if (ret == -1 && errno == EINTR) - continue; - if (ret == -1 || (pfd.revents & POLLERR)) { + if (ret == -1) { + if (errno == EINTR) + continue; mnstr_set_error_errno(s, MNSTR_READ_ERROR, "poll error"); return -1; } - if (ret == 1 && pfd.revents & POLLPRI) { + if (ret == 1) { + if (pfd.revents & POLLHUP) { + /* hung up, return EOF */ + s->eof = true; + return 0; + } + if (pfd.revents & POLLPRI) { + /* discard regular data until OOB mark */ + for (;;) { + int atmark = 0; + char flush[100]; + if (ioctlsocket(s->stream_data.s, SIOCATMARK, &atmark) < 0) { + perror("ioctl"); + break; + } + if (atmark) + break; + if (recv(s->stream_data.s, flush, sizeof(flush), 0) < 0) { + perror("recv"); + break; + } + } + char b = 0; + switch (recv(s->stream_data.s, &b, 1, MSG_OOB)) { + case 0: + /* unexpectedly didn't receive a byte */ + continue; + case 1: _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org