Changeset: 159e9f96a32c for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/159e9f96a32c
Modified Files:
        clients/Tests/exports.stable.out
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        common/stream/bs.c
        common/stream/bstream.c
        common/stream/socket_stream.c
        common/stream/stream.c
        common/stream/stream.h
        common/stream/stream_internal.h
        gdk/gdk.h
        gdk/gdk_system.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_errors.h
        monetdb5/mal/mal_import.c
        monetdb5/mal/mal_interpreter.c
        tools/monetdbe/monetdbe.c
Branch: client_interrupts
Log Message:

First version that successfully aborts a query on client interrupt.


diffs (truncated from 671 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1642,6 +1642,7 @@ stream *block_stream(stream *s);
 stream *bs_stream(stream *s);
 bstream *bstream_create(stream *rs, size_t chunk_size);
 void bstream_destroy(bstream *s);
+int bstream_getoob(bstream *s);
 ssize_t bstream_next(bstream *s);
 ssize_t bstream_read(bstream *s, size_t size);
 buffer *buffer_create(size_t size);
@@ -1681,12 +1682,14 @@ int mnstr_fsetpos(stream *restrict s, fp
 int mnstr_fsync(stream *s);
 buffer *mnstr_get_buffer(stream *s);
 bool mnstr_get_swapbytes(const stream *s);
+int mnstr_getoob(const stream *s);
 int mnstr_init(void);
 int mnstr_isalive(const stream *s);
 bool mnstr_isbinary(const stream *s);
 const char *mnstr_name(const stream *s);
 const char *mnstr_peek_error(const stream *s);
 int mnstr_printf(stream *restrict s, _In_z_ _Printf_format_string_ const char 
*restrict format, ...) __attribute__((__format__(__printf__, 2, 3)));
+int mnstr_putoob(const stream *s, char val);
 ssize_t mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, 
size_t cnt);
 int mnstr_readBte(stream *restrict s, int8_t *restrict val);
 int mnstr_readBteArray(stream *restrict s, int8_t *restrict val, size_t cnt);
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -1414,7 +1414,7 @@ SQLpagemove(int *len, int fields, int *p
                SQLseparator(len, fields, '-');
 }
 
-static bool stopped;
+static volatile sig_atomic_t stopped;
 
 static void
 renderer_sigint_handler(int signum)
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2698,7 +2698,14 @@ read_line(Mapi mid)
                /* fetch one more block */
                if (mid->trace)
                        printf("fetch next block: start at:%d\n", mid->blk.end);
-               len = mnstr_read(mid->from, mid->blk.buf + mid->blk.end, 1, 
BLOCK);
+               for (;;) {
+                       len = mnstr_read(mid->from, mid->blk.buf + 
mid->blk.end, 1, BLOCK);
+                       if (len == -1 && mnstr_errnr(mid->from) == 
MNSTR_INTERRUPT) {
+                               mnstr_clearerr(mid->from);
+                               mnstr_putoob(mid->to, 1);
+                       } else
+                               break;
+               }
                check_stream(mid, mid->from, "Connection terminated during read 
line", (mid->blk.eos = true, (char *) 0));
                mapi_log_data(mid, "RECV", mid->blk.buf + mid->blk.end, len);
                mid->blk.buf[mid->blk.end + len] = 0;
@@ -3360,9 +3367,8 @@ read_file(MapiHdl hdl, uint64_t off, cha
 static void
 compute_sigint_handler(int signum)
 {
-       if (signum == SIGINT) {
-               printf("Caught sigint while computing. Ignoring\n");
-       }
+       /* do nothing, but we do need to interrupt the  */
+       (void) signum;
 }
 
 /* Read ahead and cache data read.  Depending on the second argument,
@@ -3384,9 +3390,12 @@ read_into_cache(MapiHdl hdl, int lookahe
        char *line;
        Mapi mid;
        struct MapiResultSet *result;
-       void (*prev_handler)(int);
-
-       prev_handler = NULL;
+#ifdef HAVE_SIGACTION
+       struct sigaction osa = {0};
+#else
+       void (*prev_handler)(int) = NULL;
+#endif
+
        mid = hdl->mid;
        assert(mid->active == hdl);
        if (hdl->needmore) {
@@ -3397,17 +3406,35 @@ read_into_cache(MapiHdl hdl, int lookahe
        if ((result = hdl->active) == NULL)
                result = hdl->result;   /* may also be NULL */
 
+#ifdef HAVE_SIGACTION
+       struct sigaction sa;
+       (void) sigemptyset(&sa.sa_mask);
+       sa.sa_flags = 0;
+       sa.sa_handler = compute_sigint_handler;
+       if (sigaction(SIGINT, &sa, &osa) == -1) {
+               perror("mapi_execute_internal: could not install signal 
handler");
+               (void) sigemptyset(&osa.sa_mask);
+               osa.sa_flags = 0;
+               osa.sa_handler = SIG_DFL;
+       }
+#else
        prev_handler = signal(SIGINT, compute_sigint_handler);
        if (prev_handler == SIG_ERR) {
                perror("mapi_execute_internal: could not install signal 
handler");
                prev_handler = NULL;
        }
+#endif
 
        for (;;) {
                line = read_line(mid);
                if (line == NULL) {
+#ifdef HAVE_SIGACTION
+                       if (sigaction(SIGINT, &osa, NULL) == -1)
+                               perror("mapi_execute_internal: Could not 
restore previous handler");
+#else
                        if (prev_handler && signal(SIGINT, prev_handler) == 
SIG_ERR)
                                perror("mapi_execute_internal: Could not 
restore previous handler");
+#endif
                        if (mid->from && mnstr_eof(mid->from)) {
                                return mapi_setError(mid, "unexpected end of 
file", __func__, MERROR);
                        }
@@ -3467,8 +3494,13 @@ read_into_cache(MapiHdl hdl, int lookahe
                                }
                                continue;
                        }
+#ifdef HAVE_SIGACTION
+                       if (sigaction(SIGINT, &osa, NULL) == -1)
+                               perror("mapi_execute_internal: Could not 
restore previous handler");
+#else
                        if (prev_handler && signal(SIGINT, prev_handler) == 
SIG_ERR)
                                perror("mapi_execute_internal: Could not 
restore previous handler");
+#endif
                        return mid->error;
                case '!':
                        /* start a new result set if we don't have one
@@ -3505,8 +3537,13 @@ read_into_cache(MapiHdl hdl, int lookahe
                            (result->querytype == -1 /* unknown (not SQL) */ ||
                             result->querytype == Q_TABLE ||
                             result->querytype == Q_UPDATE)) {
+#ifdef HAVE_SIGACTION
+                               if (sigaction(SIGINT, &osa, NULL) == -1)
+                                       perror("mapi_execute_internal: Could 
not restore previous handler");
+#else
                                if (prev_handler && signal(SIGINT, 
prev_handler) == SIG_ERR)
                                        perror("mapi_execute_internal: Could 
not restore previous handler");
+#endif
                                return mid->error;
                        }
                        break;
diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -372,6 +372,7 @@ block_stream(stream *s)
        ns->write = bs_write;
        ns->close = bs_close;
        ns->destroy = bs_destroy;
+       ns->clrerr = bs_clrerr;
        ns->stream_data.p = (void *) b;
 
        return ns;
diff --git a/common/stream/bstream.c b/common/stream/bstream.c
--- a/common/stream/bstream.c
+++ b/common/stream/bstream.c
@@ -28,19 +28,19 @@ bstream_create(stream *s, size_t size)
                return NULL;
        if ((b = malloc(sizeof(*b))) == NULL)
                return NULL;
+       if (size == 0)
+               size = BUFSIZ;
        *b = (bstream) {
                .mode = size,
                .s = s,
                .eof = false,
+               .size = size,
+               .buf = malloc(size + 1 + 1),
        };
-       if (size == 0)
-               size = BUFSIZ;
-       b->buf = malloc(size + 1 + 1);
        if (b->buf == NULL) {
                free(b);
                return NULL;
        }
-       b->size = size;
        return b;
 }
 
@@ -200,3 +200,10 @@ bstream_destroy(bstream *s)
        }
 }
 
+int
+bstream_getoob(bstream *s)
+{
+       if (s && s->s)
+               return mnstr_getoob(s->s);
+       return 0;
+}
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -181,10 +181,8 @@ socket_read(stream *restrict s, void *re
                }
 #else
                nr = read(s->stream_data.s, buf, size);
-               if (nr == -1 && errno == EINTR)
-                       continue;
                if (nr == -1) {
-                       mnstr_set_error_errno(s, MNSTR_READ_ERROR, NULL);
+                       mnstr_set_error_errno(s, errno == EINTR ? 
MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL);
                        return -1;
                }
 #endif
@@ -305,6 +303,33 @@ socket_isalive(const stream *s)
 #endif
 }
 
+static int
+socket_getoob(const stream *s)
+{
+       SOCKET fd = s->stream_data.s;
+       struct pollfd pfd = (struct pollfd) {
+               .fd = fd,
+               .events = POLLPRI,
+       };
+       if (poll(&pfd, 1, 0) > 0) {
+               char b = 0;
+               recv(fd, &b, 1, MSG_OOB);
+               return b;
+       }
+       return 0;
+}
+
+static int
+socket_putoob(const stream *s, char val)
+{
+       SOCKET fd = s->stream_data.s;
+       if (send(fd, &val, 1, MSG_OOB) == -1) {
+               perror("send OOB");
+               return -1;
+       }
+       return 0;
+}
+
 static stream *
 socket_open(SOCKET sock, const char *name)
 {
@@ -323,6 +348,8 @@ socket_open(SOCKET sock, const char *nam
        s->stream_data.s = sock;
        s->update_timeout = socket_update_timeout;
        s->isalive = socket_isalive;
+       s->getoob = socket_getoob;
+       s->putoob = socket_putoob;
 
        errno = 0;
 #ifdef _MSC_VER
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -503,6 +503,8 @@ mnstr_error_kind_description(mnstr_error
                return "error reading";
        case MNSTR_WRITE_ERROR:
                return "error writing";
+       case MNSTR_INTERRUPT:
+               return "interrupted";
        case MNSTR_TIMEOUT:
                return "timeout";
        case MNSTR_UNEXPECTED_EOF:
@@ -593,6 +595,22 @@ mnstr_isalive(const stream *s)
        return 1;
 }
 
+int
+mnstr_getoob(const stream *s)
+{
+       if (s->getoob)
+               return s->getoob(s);
+       return 0;
+}
+
+int
+mnstr_putoob(const stream *s, char val)
+{
+       if (s->putoob)
+               return s->putoob(s, val);
+       return 0;
+}
+
 
 bool
 mnstr_eof(const stream *s)
@@ -830,6 +848,20 @@ wrapper_isalive(const stream *s)
 }
 
 
+static int
+wrapper_getoob(const stream *s)
+{
+       return s->inner->getoob(s->inner);
+}
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to