Changeset: 159e9f96a32c for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/159e9f96a32c Modified Files: clients/Tests/exports.stable.out clients/mapiclient/mclient.c clients/mapilib/mapi.c common/stream/bs.c common/stream/bstream.c common/stream/socket_stream.c common/stream/stream.c common/stream/stream.h common/stream/stream_internal.h gdk/gdk.h gdk/gdk_system.h monetdb5/mal/mal_client.c monetdb5/mal/mal_errors.h monetdb5/mal/mal_import.c monetdb5/mal/mal_interpreter.c tools/monetdbe/monetdbe.c Branch: client_interrupts Log Message:
First version that successfully aborts a query on client interrupt. diffs (truncated from 671 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1642,6 +1642,7 @@ stream *block_stream(stream *s); stream *bs_stream(stream *s); bstream *bstream_create(stream *rs, size_t chunk_size); void bstream_destroy(bstream *s); +int bstream_getoob(bstream *s); ssize_t bstream_next(bstream *s); ssize_t bstream_read(bstream *s, size_t size); buffer *buffer_create(size_t size); @@ -1681,12 +1682,14 @@ int mnstr_fsetpos(stream *restrict s, fp int mnstr_fsync(stream *s); buffer *mnstr_get_buffer(stream *s); bool mnstr_get_swapbytes(const stream *s); +int mnstr_getoob(const stream *s); int mnstr_init(void); int mnstr_isalive(const stream *s); bool mnstr_isbinary(const stream *s); const char *mnstr_name(const stream *s); const char *mnstr_peek_error(const stream *s); int mnstr_printf(stream *restrict s, _In_z_ _Printf_format_string_ const char *restrict format, ...) __attribute__((__format__(__printf__, 2, 3))); +int mnstr_putoob(const stream *s, char val); ssize_t mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt); int mnstr_readBte(stream *restrict s, int8_t *restrict val); int mnstr_readBteArray(stream *restrict s, int8_t *restrict val, size_t cnt); diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -1414,7 +1414,7 @@ SQLpagemove(int *len, int fields, int *p SQLseparator(len, fields, '-'); } -static bool stopped; +static volatile sig_atomic_t stopped; static void renderer_sigint_handler(int signum) diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2698,7 +2698,14 @@ read_line(Mapi mid) /* fetch one more block */ if (mid->trace) printf("fetch next block: start at:%d\n", mid->blk.end); - len = mnstr_read(mid->from, mid->blk.buf + mid->blk.end, 1, BLOCK); + for (;;) { + len = mnstr_read(mid->from, mid->blk.buf + mid->blk.end, 1, BLOCK); + if (len == -1 && mnstr_errnr(mid->from) == MNSTR_INTERRUPT) { + mnstr_clearerr(mid->from); + mnstr_putoob(mid->to, 1); + } else + break; + } check_stream(mid, mid->from, "Connection terminated during read line", (mid->blk.eos = true, (char *) 0)); mapi_log_data(mid, "RECV", mid->blk.buf + mid->blk.end, len); mid->blk.buf[mid->blk.end + len] = 0; @@ -3360,9 +3367,8 @@ read_file(MapiHdl hdl, uint64_t off, cha static void compute_sigint_handler(int signum) { - if (signum == SIGINT) { - printf("Caught sigint while computing. Ignoring\n"); - } + /* do nothing, but we do need to interrupt the */ + (void) signum; } /* Read ahead and cache data read. Depending on the second argument, @@ -3384,9 +3390,12 @@ read_into_cache(MapiHdl hdl, int lookahe char *line; Mapi mid; struct MapiResultSet *result; - void (*prev_handler)(int); - - prev_handler = NULL; +#ifdef HAVE_SIGACTION + struct sigaction osa = {0}; +#else + void (*prev_handler)(int) = NULL; +#endif + mid = hdl->mid; assert(mid->active == hdl); if (hdl->needmore) { @@ -3397,17 +3406,35 @@ read_into_cache(MapiHdl hdl, int lookahe if ((result = hdl->active) == NULL) result = hdl->result; /* may also be NULL */ +#ifdef HAVE_SIGACTION + struct sigaction sa; + (void) sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sa.sa_handler = compute_sigint_handler; + if (sigaction(SIGINT, &sa, &osa) == -1) { + perror("mapi_execute_internal: could not install signal handler"); + (void) sigemptyset(&osa.sa_mask); + osa.sa_flags = 0; + osa.sa_handler = SIG_DFL; + } +#else prev_handler = signal(SIGINT, compute_sigint_handler); if (prev_handler == SIG_ERR) { perror("mapi_execute_internal: could not install signal handler"); prev_handler = NULL; } +#endif for (;;) { line = read_line(mid); if (line == NULL) { +#ifdef HAVE_SIGACTION + if (sigaction(SIGINT, &osa, NULL) == -1) + perror("mapi_execute_internal: Could not restore previous handler"); +#else if (prev_handler && signal(SIGINT, prev_handler) == SIG_ERR) perror("mapi_execute_internal: Could not restore previous handler"); +#endif if (mid->from && mnstr_eof(mid->from)) { return mapi_setError(mid, "unexpected end of file", __func__, MERROR); } @@ -3467,8 +3494,13 @@ read_into_cache(MapiHdl hdl, int lookahe } continue; } +#ifdef HAVE_SIGACTION + if (sigaction(SIGINT, &osa, NULL) == -1) + perror("mapi_execute_internal: Could not restore previous handler"); +#else if (prev_handler && signal(SIGINT, prev_handler) == SIG_ERR) perror("mapi_execute_internal: Could not restore previous handler"); +#endif return mid->error; case '!': /* start a new result set if we don't have one @@ -3505,8 +3537,13 @@ read_into_cache(MapiHdl hdl, int lookahe (result->querytype == -1 /* unknown (not SQL) */ || result->querytype == Q_TABLE || result->querytype == Q_UPDATE)) { +#ifdef HAVE_SIGACTION + if (sigaction(SIGINT, &osa, NULL) == -1) + perror("mapi_execute_internal: Could not restore previous handler"); +#else if (prev_handler && signal(SIGINT, prev_handler) == SIG_ERR) perror("mapi_execute_internal: Could not restore previous handler"); +#endif return mid->error; } break; diff --git a/common/stream/bs.c b/common/stream/bs.c --- a/common/stream/bs.c +++ b/common/stream/bs.c @@ -372,6 +372,7 @@ block_stream(stream *s) ns->write = bs_write; ns->close = bs_close; ns->destroy = bs_destroy; + ns->clrerr = bs_clrerr; ns->stream_data.p = (void *) b; return ns; diff --git a/common/stream/bstream.c b/common/stream/bstream.c --- a/common/stream/bstream.c +++ b/common/stream/bstream.c @@ -28,19 +28,19 @@ bstream_create(stream *s, size_t size) return NULL; if ((b = malloc(sizeof(*b))) == NULL) return NULL; + if (size == 0) + size = BUFSIZ; *b = (bstream) { .mode = size, .s = s, .eof = false, + .size = size, + .buf = malloc(size + 1 + 1), }; - if (size == 0) - size = BUFSIZ; - b->buf = malloc(size + 1 + 1); if (b->buf == NULL) { free(b); return NULL; } - b->size = size; return b; } @@ -200,3 +200,10 @@ bstream_destroy(bstream *s) } } +int +bstream_getoob(bstream *s) +{ + if (s && s->s) + return mnstr_getoob(s->s); + return 0; +} diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -181,10 +181,8 @@ socket_read(stream *restrict s, void *re } #else nr = read(s->stream_data.s, buf, size); - if (nr == -1 && errno == EINTR) - continue; if (nr == -1) { - mnstr_set_error_errno(s, MNSTR_READ_ERROR, NULL); + mnstr_set_error_errno(s, errno == EINTR ? MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL); return -1; } #endif @@ -305,6 +303,33 @@ socket_isalive(const stream *s) #endif } +static int +socket_getoob(const stream *s) +{ + SOCKET fd = s->stream_data.s; + struct pollfd pfd = (struct pollfd) { + .fd = fd, + .events = POLLPRI, + }; + if (poll(&pfd, 1, 0) > 0) { + char b = 0; + recv(fd, &b, 1, MSG_OOB); + return b; + } + return 0; +} + +static int +socket_putoob(const stream *s, char val) +{ + SOCKET fd = s->stream_data.s; + if (send(fd, &val, 1, MSG_OOB) == -1) { + perror("send OOB"); + return -1; + } + return 0; +} + static stream * socket_open(SOCKET sock, const char *name) { @@ -323,6 +348,8 @@ socket_open(SOCKET sock, const char *nam s->stream_data.s = sock; s->update_timeout = socket_update_timeout; s->isalive = socket_isalive; + s->getoob = socket_getoob; + s->putoob = socket_putoob; errno = 0; #ifdef _MSC_VER diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -503,6 +503,8 @@ mnstr_error_kind_description(mnstr_error return "error reading"; case MNSTR_WRITE_ERROR: return "error writing"; + case MNSTR_INTERRUPT: + return "interrupted"; case MNSTR_TIMEOUT: return "timeout"; case MNSTR_UNEXPECTED_EOF: @@ -593,6 +595,22 @@ mnstr_isalive(const stream *s) return 1; } +int +mnstr_getoob(const stream *s) +{ + if (s->getoob) + return s->getoob(s); + return 0; +} + +int +mnstr_putoob(const stream *s, char val) +{ + if (s->putoob) + return s->putoob(s, val); + return 0; +} + bool mnstr_eof(const stream *s) @@ -830,6 +848,20 @@ wrapper_isalive(const stream *s) } +static int +wrapper_getoob(const stream *s) +{ + return s->inner->getoob(s->inner); +} _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org