Changeset: 78b4cf3f8581 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/78b4cf3f8581
Modified Files:
        clients/Tests/exports.stable.out
Branch: default
Log Message:

Merge with Aug2024 branch.


diffs (truncated from 542 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -717,6 +717,7 @@ MapiMsg mapi_seek_row(MapiHdl hdl, int64
 MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1)));
 MapiMsg mapi_setAutocommit(Mapi mid, bool autocommit) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_columnar_protocol(Mapi mid, bool columnar_protocol) 
__attribute__((__nonnull__(1)));
+MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data) __attribute__((__nonnull__(1)));
 MapiMsg mapi_set_size_header(Mapi mid, bool value) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_time_zone(Mapi mid, int seconds_east_of_utc) 
__attribute__((__nonnull__(1)));
 MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void 
*), void *callback_data) __attribute__((__nonnull__(1)));
@@ -1738,6 +1739,7 @@ int mnstr_readStr(stream *restrict s, ch
 ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t 
elmsize, size_t cnt);
 ssize_t mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt);
 void mnstr_set_bigendian(stream *s, bool bigendian);
+void mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...) 
__attribute__((__format__(__printf__, 3, 4)));
 void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void 
*data);
 const char *mnstr_version(void);
 ssize_t mnstr_write(stream *restrict s, const void *restrict buf, size_t 
elmsize, size_t cnt);
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -25,6 +25,7 @@
 #  include "getopt.h"
 # endif
 #endif
+#include "stream.h"
 #include "mapi.h"
 #include <unistd.h>
 #include <string.h>
@@ -38,7 +39,6 @@
 #include <readline/history.h>
 #include "ReadlineTools.h"
 #endif
-#include "stream.h"
 #include "msqldump.h"
 #define LIBMUTILS 1
 #include "mprompt.h"
@@ -1308,6 +1308,10 @@ sigint_handler(int signum)
        (void) signum;
 
        state = INTERRUPT;
+#ifndef HAVE_SIGACTION
+       if (signal(signum, sigint_handler) == SIG_ERR)
+               perror("Could not reinstall sigal handler");
+#endif
 #ifdef HAVE_LIBREADLINE
        readline_int_handler();
 #endif
@@ -2289,7 +2293,7 @@ doFile(Mapi mid, stream *fp, bool useins
                        char *newbuf;
                        state = READING;
                        l = mnstr_readline(fp, buf + length, bufsiz - length);
-                       if (l == -1 && state == INTERRUPT) {
+                       if (l <= 0 && state == INTERRUPT) {
                                /* we were interrupted */
                                mnstr_clearerr(fp);
                                mnstr_write(toConsole, "\n", 1, 1);
@@ -3293,8 +3297,19 @@ isfile(FILE *fp)
        return true;
 }
 
+static bool
+interrupted(void *m)
+{
+       Mapi mid = m;
+       if (state == INTERRUPT) {
+               mnstr_set_error(mapi_get_from(mid), MNSTR_INTERRUPT, NULL);
+               return true;
+       }
+       return false;
+}
+
 static void
-catch_interrupts(void)
+catch_interrupts(Mapi mid)
 {
 #ifdef HAVE_SIGACTION
        struct sigaction sa;
@@ -3309,6 +3324,7 @@ catch_interrupts(void)
                perror("Could not install signal handler");
        }
 #endif
+       mapi_set_rtimeout(mid, 100, interrupted, mid);
 }
 
 int
@@ -3743,7 +3759,7 @@ main(int argc, char **argv)
        if (!has_fileargs && command == NULL && isatty(fileno(stdin))) {
                char *lang;
 
-               catch_interrupts();
+               catch_interrupts(mid);
 
                if (mode == SQL) {
                        lang = "/SQL";
@@ -3843,7 +3859,7 @@ main(int argc, char **argv)
 
                        if (s == NULL) {
                                if (strcmp(arg, "-") == 0) {
-                                       catch_interrupts();
+                                       catch_interrupts(mid);
                                        s = stdin_rastream();
                                } else {
                                        s = open_rastream(arg);
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2401,6 +2401,16 @@ prepareQuery(MapiHdl hdl, const char *cm
 
 
 MapiMsg
+mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), 
void *callback_data)
+{
+       mapi_check(mid);
+       if (mid->trace)
+               printf("Set timeout to %u\n", timeout);
+       mnstr_settimeout(mid->from, timeout, callback, callback_data);
+       return MOK;
+}
+
+MapiMsg
 mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *), 
void *callback_data)
 {
        mapi_check(mid);
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -208,6 +208,8 @@ mapi_export MapiMsg mapi_cache_freeup(Ma
 mapi_export MapiMsg mapi_seek_row(MapiHdl hdl, int64_t rowne, int whence)
        __attribute__((__nonnull__(1)));
 
+mapi_export MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data)
+       __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool 
(*callback)(void *), void *callback_data)
        __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_timeout(Mapi mid, unsigned int time)
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -37,24 +37,26 @@ socket_getoob(const stream *s)
        };
        if (poll(&pfd, 1, 0) > 0)
 #else
-       fd_set fds;
+       fd_set xfds;
        struct timeval t = (struct timeval) {
                .tv_sec = 0,
                .tv_usec = 0,
        };
+#ifndef _MSC_VER
 #ifdef FD_SETSIZE
        if (fd >= FD_SETSIZE)
                return 0;
 #endif
-       FD_ZERO(&fds);
-       FD_SET(fd, &fds);
+#endif
+       FD_ZERO(&xfds);
+       FD_SET(fd, &xfds);
        if (select(
 #ifdef _MSC_VER
                        0,      /* ignored on Windows */
 #else
                        fd + 1,
 #endif
-                       NULL, NULL, &fds, &t) > 0)
+                       NULL, NULL, &xfds, &t) > 0)
 #endif
        {
 #ifdef HAVE_POLL
@@ -63,10 +65,11 @@ socket_getoob(const stream *s)
                if ((pfd.revents & POLLPRI) == 0)
                        return -1;
 #else
-               if (!FD_ISSET(fd, &fds))
+               if (!FD_ISSET(fd, &xfds))
                        return 0;
 #endif
                /* discard regular data until OOB mark */
+#ifndef _MSC_VER                               /* Windows has to be 
different... */
                for (;;) {
                        int atmark = 0;
                        char flush[100];
@@ -81,6 +84,7 @@ socket_getoob(const stream *s)
                                break;
                        }
                }
+#endif
                char b = 0;
                switch (recv(fd, &b, 1, MSG_OOB)) {
                case 0:
@@ -107,7 +111,7 @@ socket_putoob(const stream *s, char val)
        return 0;
 }
 
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
 /* UNIX domain sockets do not support OOB messages, so we need to do
  * something different */
 #define OOBMSG0        '\377'                  /* the two special bytes we 
send as "OOB" */
@@ -129,10 +133,12 @@ socket_getoob_unix(const stream *s)
                .tv_sec = 0,
                .tv_usec = 0,
        };
+#ifndef _MSC_VER
 #ifdef FD_SETSIZE
        if (fd >= FD_SETSIZE)
                return 0;
 #endif
+#endif
        FD_ZERO(&fds);
        FD_SET(fd, &fds);
        if (select(
@@ -176,7 +182,7 @@ static ssize_t
 socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, 
size_t cnt)
 {
        size_t size = elmsize * cnt, res = 0;
-#ifdef NATIVE_WIN32
+#ifdef _MSC_VER
        int nr = 0;
 #else
        ssize_t nr = 0;
@@ -191,12 +197,14 @@ socket_write(stream *restrict s, const v
        errno = 0;
        while (res < size &&
               (
-#ifdef NATIVE_WIN32
-                      /* send works on int, make sure the argument fits */
-                      ((nr = send(s->stream_data.s, (const char *) buf + res, 
(int) min(size - res, 1 << 16), 0)) > 0)
+                      /* Windows send works on int, make sure the argument 
fits */
+                      ((nr = send(s->stream_data.s, (const char *) buf + res,
+#ifdef _MSC_VER
+                                                  (int) min(size - res, 1 << 
16)
 #else
-                      ((nr = write(s->stream_data.s, (const char *) buf + res, 
size - res)) > 0)
+                                                  size
 #endif
+                                                  , 0)) > 0)
                       || (nr < 0 &&    /* syscall failed */
                           s->timeout > 0 &&    /* potentially timeout */
 #ifdef _MSC_VER
@@ -210,7 +218,7 @@ socket_write(stream *restrict s, const v
 #endif
                           s->timeout_func != NULL &&   /* callback function 
exists */
                           !s->timeout_func(s->timeout_data))   /* callback 
says don't stop */
-                      ||(nr < 0 &&
+                      || (nr < 0 &&
 #ifdef _MSC_VER
                          WSAGetLastError() == WSAEINTR
 #else
@@ -275,20 +283,82 @@ socket_read(stream *restrict s, void *re
 
                        pfd = (struct pollfd) {.fd = s->stream_data.s,
                                               .events = POLLIN};
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
                        if (s->putoob != socket_putoob_unix)
                                pfd.events |= POLLPRI;
 #endif
 
                        ret = poll(&pfd, 1, (int) s->timeout);
-                       if (ret == -1 && errno == EINTR)
-                               continue;
-                       if (ret == -1 || (pfd.revents & POLLERR)) {
+                       if (ret == -1) {
+                               if (errno == EINTR)
+                                       continue;
                                mnstr_set_error_errno(s, MNSTR_READ_ERROR, 
"poll error");
                                return -1;
                        }
-                       if (ret == 1 && pfd.revents & POLLPRI) {
+                       if (ret == 1) {
+                               if (pfd.revents & POLLHUP) {
+                                       /* hung up, return EOF */
+                                       s->eof = true;
+                                       return 0;
+                               }
+                               if (pfd.revents & POLLPRI) {
+                                       /* discard regular data until OOB mark 
*/
+                                       for (;;) {
+                                               int atmark = 0;
+                                               char flush[100];
+                                               if 
(ioctlsocket(s->stream_data.s, SIOCATMARK, &atmark) < 0) {
+                                                       perror("ioctl");
+                                                       break;
+                                               }
+                                               if (atmark)
+                                                       break;
+                                               if (recv(s->stream_data.s, 
flush, sizeof(flush), 0) < 0) {
+                                                       perror("recv");
+                                                       break;
+                                               }
+                                       }
+                                       char b = 0;
+                                       switch (recv(s->stream_data.s, &b, 1, 
MSG_OOB)) {
+                                       case 0:
+                                               /* unexpectedly didn't receive 
a byte */
+                                               continue;
+                                       case 1:
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to