Changeset: 930a4f72e4b4 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/930a4f72e4b4 Modified Files: clients/mapiclient/mclient.c clients/mapilib/mapi.c clients/mapilib/mapi.h clients/mapilib/mapi_intern.h monetdb5/modules/mal/tablet.c monetdb5/modules/mal/tablet.h sql/backends/monet5/sql.c sql/backends/monet5/sql_result.c sql/backends/monet5/sql_scenario.c sql/server/sql_parser.y sql/server/sql_scan.c sql/server/sql_scan.h Branch: client_interrupts Log Message:
Implemented abort on interrupt of ON CLIENT processing. Also implemented abort when ON CLIENT reading fails for some other reason half way through. diffs (truncated from 573 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -2420,9 +2420,12 @@ doFile(Mapi mid, stream *fp, bool useins mnstr_write(toConsole, "\n", 1, 1); if (hdl) { /* on interrupt when continuing a query, force an error */ - buf[0] = '\200'; - buf[1] = '\n'; - l = 2; + l = 0; + if (mapi_query_abort(hdl, 1) != MOK) { + /* if abort failed, insert something not allowed */ + buf[l++] = '\200'; + } + buf[l++] = '\n'; length = 0; } else { /* not continuing; just repeat */ @@ -3060,6 +3063,7 @@ doFile(Mapi mid, stream *fp, bool useins struct privdata { stream *f; char *buf; + Mapi mid; }; #define READSIZE (1 << 16) @@ -3165,13 +3169,18 @@ getfile(void *data, const char *filename if (state == INTERRUPT) { close_stream(f); priv->f = NULL; + (void) mapi_query_abort(mapi_get_active(priv->mid), 1); return "interrupted"; } s = mnstr_read(f, buf, 1, READSIZE); if (s <= 0) { close_stream(f); priv->f = NULL; - return s < 0 ? "error reading file" : NULL; + if (s < 0) { + (void) mapi_query_abort(mapi_get_active(priv->mid), state == INTERRUPT ? 1 : 2); + return "error reading file"; + } + return NULL; } if (size) *size = (size_t) s; @@ -3198,6 +3207,8 @@ putfile(void *data, const char *filename } } #endif + if (state == INTERRUPT) + goto interrupted; if (buf == NULL || bufsize == 0) return NULL; /* successfully opened file */ } else if (buf == NULL) { @@ -3207,6 +3218,18 @@ putfile(void *data, const char *filename priv->f = NULL; return flush < 0 ? "error writing output" : NULL; } + if (state == INTERRUPT) { + char *filename; + interrupted: + filename = strdup(mnstr_name(priv->f)); + close_stream(priv->f); + priv->f = NULL; + if (filename) { + MT_remove(filename); + free(filename); + } + return "query aborted"; + } if (mnstr_write(priv->f, buf, 1, bufsize) < (ssize_t) bufsize) { close_stream(priv->f); priv->f = NULL; @@ -3716,7 +3739,7 @@ main(int argc, char **argv) } struct privdata priv; - priv = (struct privdata) {0}; + priv = (struct privdata) {.mid = mid}; mapi_setfilecallback2(mid, getfile, putfile, &priv); mapi_trace(mid, trace); diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -1652,12 +1652,11 @@ mapi_new_handle(Mapi mid) mapi_setError(mid, "Memory allocation failure", __func__, MERROR); return NULL; } + /* initialize and add to doubly-linked list */ *hdl = (struct MapiStatement) { .mid = mid, - .needmore = false, + .next = mid->first, }; - /* add to doubly-linked list */ - hdl->next = mid->first; mid->first = hdl; if (hdl->next) hdl->next->prev = hdl; @@ -2181,8 +2180,7 @@ mapi_disconnect(Mapi mid) * The arguments are: * private - the value of the filecontentprivate argument to * mapi_setfilecallback; - * filename - the file to be written, files are always written as text - * files; + * filename - the file to be written; * binary - if set, the data to be written is binary and the file * should therefore be opened in binary mode, otherwise the * data is UTF-8 encoded text; @@ -2702,8 +2700,10 @@ read_line(Mapi mid) len = mnstr_read(mid->from, mid->blk.buf + mid->blk.end, 1, BLOCK); if (len == -1 && mnstr_errnr(mid->from) == MNSTR_INTERRUPT) { mnstr_clearerr(mid->from); - if (mid->oobintr) + if (mid->oobintr && !mid->active->aborted) { + mid->active->aborted = true; mnstr_putoob(mid->to, 1); + } } else break; } @@ -3239,10 +3239,24 @@ write_file(MapiHdl hdl, char *filename, return; } mnstr_flush(mid->to, MNSTR_FLUSH_DATA); - while ((len = mnstr_read(mid->from, data, 1, sizeof(data))) > 0) { - if (line == NULL) + for (;;) { + len = mnstr_read(mid->from, data, 1, sizeof(data)); + if (len == -1) { + if (mnstr_errnr(mid->from) == MNSTR_INTERRUPT) { + mnstr_clearerr(mid->from); + if (mid->oobintr && !hdl->aborted) { + hdl->aborted = true; + mnstr_putoob(mid->to, 1); + } + } else { + break; + } + } else if (len == 0) { + break; + } else if (line == NULL) { line = mid->putfilecontent(mid->filecontentprivate, NULL, binary, data, len); + } } if (line == NULL) line = mid->putfilecontent(mid->filecontentprivate, @@ -3343,8 +3357,10 @@ read_file(MapiHdl hdl, uint64_t off, cha if (data != NULL && size == 0) { /* some error occurred */ mnstr_clearerr(mid->from); - if (mid->oobintr) + if (mid->oobintr && !hdl->aborted) { + hdl->aborted = true; mnstr_putoob(mid->to, 1); + } } mnstr_flush(mid->to, MNSTR_FLUSH_DATA); line = read_line(mid); @@ -3698,6 +3714,23 @@ mapi_query_done(MapiHdl hdl) } MapiMsg +mapi_query_abort(MapiHdl hdl, int reason) +{ + Mapi mid; + + assert(reason > 0 && reason <= 127); + mapi_hdl_check(hdl); + mid = hdl->mid; + assert(mid->active == NULL || mid->active == hdl); + if (mid->oobintr && !hdl->aborted) { + mnstr_putoob(mid->to, reason); + hdl->aborted = true; + return MOK; + } + return MERROR; +} + +MapiMsg mapi_cache_limit(Mapi mid, int limit) { /* clean out superflous space TODO */ diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -194,6 +194,8 @@ mapi_export MapiMsg mapi_query_part(Mapi __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_query_done(MapiHdl hdl) __attribute__((__nonnull__(1))); +mapi_export MapiMsg mapi_query_abort(MapiHdl hdl, int reason) + __attribute__((__nonnull__(1))); mapi_export MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1))); mapi_export MapiMsg mapi_read_response(MapiHdl hdl) diff --git a/clients/mapilib/mapi_intern.h b/clients/mapilib/mapi_intern.h --- a/clients/mapilib/mapi_intern.h +++ b/clients/mapilib/mapi_intern.h @@ -196,13 +196,14 @@ struct MapiStatement { char *template; /* keep parameterized query text around */ char *query; int maxbindings; + int maxparams; struct MapiBinding *bindings; - int maxparams; struct MapiParam *params; struct MapiResultSet *result, *active, *lastresult; - bool needmore; /* need more input */ int *pending_close; int npending_close; + bool needmore; /* need more input */ + bool aborted; /* this query was aborted */ MapiHdl prev, next; }; diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c --- a/monetdb5/modules/mal/tablet.c +++ b/monetdb5/modules/mal/tablet.c @@ -350,31 +350,6 @@ output_line_lookup(char **buf, size_t *l return 0; } -/* returns TRUE if there is/might be more */ -static bool -tablet_read_more(bstream *in, stream *out, size_t n) -{ - if (out) { - do { - /* query is not finished ask for more */ - /* we need more query text */ - if (bstream_next(in) < 0) - return false; - if (in->eof) { - if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1) - mnstr_flush(out, MNSTR_FLUSH_DATA); - in->eof = false; - /* we need more query text */ - if (bstream_next(in) <= 0) - return false; - } - } while (in->len <= in->pos); - } else if (bstream_read(in, n) <= 0) { - return false; - } - return true; -} - /* * Fast Load * To speedup the CPU intensive loading of files we have to break @@ -422,7 +397,7 @@ tablet_read_more(bstream *in, stream *ou */ static int -output_file_default(Tablet *as, BAT *order, stream *fd) +output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in) { size_t len = BUFSIZ, locallen = BUFSIZ; int res = 0; @@ -439,10 +414,12 @@ output_file_default(Tablet *as, BAT *ord } for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q; p++, id++) { + if (bstream_getoob(in)) { + res = -5; + break; + } if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) { - GDKfree(buf); - GDKfree(localbuf); - return res; + break; } } GDKfree(localbuf); @@ -451,7 +428,7 @@ output_file_default(Tablet *as, BAT *ord } static int -output_file_dense(Tablet *as, stream *fd) +output_file_dense(Tablet *as, stream *fd, bstream *in) { size_t len = BUFSIZ, locallen = BUFSIZ; int res = 0; @@ -465,10 +442,12 @@ output_file_dense(Tablet *as, stream *fd return -1; } for (i = 0; i < as->nr; i++) { + if (bstream_getoob(in)) { + res = -5; /* "Query aborted" */ + break; + } if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) { - GDKfree(buf); - GDKfree(localbuf); - return res; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org