Changeset: 930a4f72e4b4 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/930a4f72e4b4
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        clients/mapilib/mapi_intern.h
        monetdb5/modules/mal/tablet.c
        monetdb5/modules/mal/tablet.h
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_result.c
        sql/backends/monet5/sql_scenario.c
        sql/server/sql_parser.y
        sql/server/sql_scan.c
        sql/server/sql_scan.h
Branch: client_interrupts
Log Message:

Implemented abort on interrupt of ON CLIENT processing.
Also implemented abort when ON CLIENT reading fails for some other
reason half way through.


diffs (truncated from 573 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2420,9 +2420,12 @@ doFile(Mapi mid, stream *fp, bool useins
                                mnstr_write(toConsole, "\n", 1, 1);
                                if (hdl) {
                                        /* on interrupt when continuing a 
query, force an error */
-                                       buf[0] = '\200';
-                                       buf[1] = '\n';
-                                       l = 2;
+                                       l = 0;
+                                       if (mapi_query_abort(hdl, 1) != MOK) {
+                                               /* if abort failed, insert 
something not allowed */
+                                               buf[l++] = '\200';
+                                       }
+                                       buf[l++] = '\n';
                                        length = 0;
                                } else {
                                        /* not continuing; just repeat */
@@ -3060,6 +3063,7 @@ doFile(Mapi mid, stream *fp, bool useins
 struct privdata {
        stream *f;
        char *buf;
+       Mapi mid;
 };
 
 #define READSIZE       (1 << 16)
@@ -3165,13 +3169,18 @@ getfile(void *data, const char *filename
        if (state == INTERRUPT) {
                close_stream(f);
                priv->f = NULL;
+               (void) mapi_query_abort(mapi_get_active(priv->mid), 1);
                return "interrupted";
        }
        s = mnstr_read(f, buf, 1, READSIZE);
        if (s <= 0) {
                close_stream(f);
                priv->f = NULL;
-               return s < 0 ? "error reading file" : NULL;
+               if (s < 0) {
+                       (void) mapi_query_abort(mapi_get_active(priv->mid), 
state == INTERRUPT ? 1 : 2);
+                       return "error reading file";
+               }
+               return NULL;
        }
        if (size)
                *size = (size_t) s;
@@ -3198,6 +3207,8 @@ putfile(void *data, const char *filename
                        }
                }
 #endif
+               if (state == INTERRUPT)
+                       goto interrupted;
                if (buf == NULL || bufsize == 0)
                        return NULL; /* successfully opened file */
        } else if (buf == NULL) {
@@ -3207,6 +3218,18 @@ putfile(void *data, const char *filename
                priv->f = NULL;
                return flush < 0 ? "error writing output" : NULL;
        }
+       if (state == INTERRUPT) {
+               char *filename;
+         interrupted:
+               filename = strdup(mnstr_name(priv->f));
+               close_stream(priv->f);
+               priv->f = NULL;
+               if (filename) {
+                       MT_remove(filename);
+                       free(filename);
+               }
+               return "query aborted";
+       }
        if (mnstr_write(priv->f, buf, 1, bufsize) < (ssize_t) bufsize) {
                close_stream(priv->f);
                priv->f = NULL;
@@ -3716,7 +3739,7 @@ main(int argc, char **argv)
        }
 
        struct privdata priv;
-       priv = (struct privdata) {0};
+       priv = (struct privdata) {.mid = mid};
        mapi_setfilecallback2(mid, getfile, putfile, &priv);
 
        mapi_trace(mid, trace);
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -1652,12 +1652,11 @@ mapi_new_handle(Mapi mid)
                mapi_setError(mid, "Memory allocation failure", __func__, 
MERROR);
                return NULL;
        }
+       /* initialize and add to doubly-linked list */
        *hdl = (struct MapiStatement) {
                .mid = mid,
-               .needmore = false,
+               .next = mid->first,
        };
-       /* add to doubly-linked list */
-       hdl->next = mid->first;
        mid->first = hdl;
        if (hdl->next)
                hdl->next->prev = hdl;
@@ -2181,8 +2180,7 @@ mapi_disconnect(Mapi mid)
  * The arguments are:
  * private - the value of the filecontentprivate argument to
  *           mapi_setfilecallback;
- * filename - the file to be written, files are always written as text
- *            files;
+ * filename - the file to be written;
  * binary - if set, the data to be written is binary and the file
  *          should therefore be opened in binary mode, otherwise the
  *          data is UTF-8 encoded text;
@@ -2702,8 +2700,10 @@ read_line(Mapi mid)
                        len = mnstr_read(mid->from, mid->blk.buf + 
mid->blk.end, 1, BLOCK);
                        if (len == -1 && mnstr_errnr(mid->from) == 
MNSTR_INTERRUPT) {
                                mnstr_clearerr(mid->from);
-                               if (mid->oobintr)
+                               if (mid->oobintr && !mid->active->aborted) {
+                                       mid->active->aborted = true;
                                        mnstr_putoob(mid->to, 1);
+                               }
                        } else
                                break;
                }
@@ -3239,10 +3239,24 @@ write_file(MapiHdl hdl, char *filename, 
                return;
        }
        mnstr_flush(mid->to, MNSTR_FLUSH_DATA);
-       while ((len = mnstr_read(mid->from, data, 1, sizeof(data))) > 0) {
-               if (line == NULL)
+       for (;;) {
+               len = mnstr_read(mid->from, data, 1, sizeof(data));
+               if (len == -1) {
+                       if (mnstr_errnr(mid->from) == MNSTR_INTERRUPT) {
+                               mnstr_clearerr(mid->from);
+                               if (mid->oobintr && !hdl->aborted) {
+                                       hdl->aborted = true;
+                                       mnstr_putoob(mid->to, 1);
+                               }
+                       } else {
+                               break;
+                       }
+               } else if (len == 0) {
+                       break;
+               } else if (line == NULL) {
                        line = mid->putfilecontent(mid->filecontentprivate,
                                                   NULL, binary, data, len);
+               }
        }
        if (line == NULL)
                line = mid->putfilecontent(mid->filecontentprivate,
@@ -3343,8 +3357,10 @@ read_file(MapiHdl hdl, uint64_t off, cha
        if (data != NULL && size == 0) {
                /* some error occurred */
                mnstr_clearerr(mid->from);
-               if (mid->oobintr)
+               if (mid->oobintr && !hdl->aborted) {
+                       hdl->aborted = true;
                        mnstr_putoob(mid->to, 1);
+               }
        }
        mnstr_flush(mid->to, MNSTR_FLUSH_DATA);
        line = read_line(mid);
@@ -3698,6 +3714,23 @@ mapi_query_done(MapiHdl hdl)
 }
 
 MapiMsg
+mapi_query_abort(MapiHdl hdl, int reason)
+{
+       Mapi mid;
+
+       assert(reason > 0 && reason <= 127);
+       mapi_hdl_check(hdl);
+       mid = hdl->mid;
+       assert(mid->active == NULL || mid->active == hdl);
+       if (mid->oobintr && !hdl->aborted) {
+               mnstr_putoob(mid->to, reason);
+               hdl->aborted = true;
+               return MOK;
+       }
+       return MERROR;
+}
+
+MapiMsg
 mapi_cache_limit(Mapi mid, int limit)
 {
        /* clean out superflous space TODO */
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -194,6 +194,8 @@ mapi_export MapiMsg mapi_query_part(Mapi
        __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_query_done(MapiHdl hdl)
        __attribute__((__nonnull__(1)));
+mapi_export MapiMsg mapi_query_abort(MapiHdl hdl, int reason)
+       __attribute__((__nonnull__(1)));
 mapi_export MapiHdl mapi_send(Mapi mid, const char *cmd)
        __attribute__((__nonnull__(1)));
 mapi_export MapiMsg mapi_read_response(MapiHdl hdl)
diff --git a/clients/mapilib/mapi_intern.h b/clients/mapilib/mapi_intern.h
--- a/clients/mapilib/mapi_intern.h
+++ b/clients/mapilib/mapi_intern.h
@@ -196,13 +196,14 @@ struct MapiStatement {
        char *template;         /* keep parameterized query text around */
        char *query;
        int maxbindings;
+       int maxparams;
        struct MapiBinding *bindings;
-       int maxparams;
        struct MapiParam *params;
        struct MapiResultSet *result, *active, *lastresult;
-       bool needmore;          /* need more input */
        int *pending_close;
        int npending_close;
+       bool needmore;          /* need more input */
+       bool aborted;           /* this query was aborted */
        MapiHdl prev, next;
 };
 
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -350,31 +350,6 @@ output_line_lookup(char **buf, size_t *l
        return 0;
 }
 
-/* returns TRUE if there is/might be more */
-static bool
-tablet_read_more(bstream *in, stream *out, size_t n)
-{
-       if (out) {
-               do {
-                       /* query is not finished ask for more */
-                       /* we need more query text */
-                       if (bstream_next(in) < 0)
-                               return false;
-                       if (in->eof) {
-                               if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 
1, 1) == 1)
-                                       mnstr_flush(out, MNSTR_FLUSH_DATA);
-                               in->eof = false;
-                               /* we need more query text */
-                               if (bstream_next(in) <= 0)
-                                       return false;
-                       }
-               } while (in->len <= in->pos);
-       } else if (bstream_read(in, n) <= 0) {
-               return false;
-       }
-       return true;
-}
-
 /*
  * Fast Load
  * To speedup the CPU intensive loading of files we have to break
@@ -422,7 +397,7 @@ tablet_read_more(bstream *in, stream *ou
  */
 
 static int
-output_file_default(Tablet *as, BAT *order, stream *fd)
+output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
 {
        size_t len = BUFSIZ, locallen = BUFSIZ;
        int res = 0;
@@ -439,10 +414,12 @@ output_file_default(Tablet *as, BAT *ord
        }
        for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p 
< q;
                 p++, id++) {
+               if (bstream_getoob(in)) {
+                       res = -5;
+                       break;
+               }
                if ((res = output_line(&buf, &len, &localbuf, &locallen, 
as->format, fd, as->nr_attrs, id)) < 0) {
-                       GDKfree(buf);
-                       GDKfree(localbuf);
-                       return res;
+                       break;
                }
        }
        GDKfree(localbuf);
@@ -451,7 +428,7 @@ output_file_default(Tablet *as, BAT *ord
 }
 
 static int
-output_file_dense(Tablet *as, stream *fd)
+output_file_dense(Tablet *as, stream *fd, bstream *in)
 {
        size_t len = BUFSIZ, locallen = BUFSIZ;
        int res = 0;
@@ -465,10 +442,12 @@ output_file_dense(Tablet *as, stream *fd
                return -1;
        }
        for (i = 0; i < as->nr; i++) {
+               if (bstream_getoob(in)) {
+                       res = -5;                       /* "Query aborted" */
+                       break;
+               }
                if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, 
as->format, fd, as->nr_attrs)) < 0) {
-                       GDKfree(buf);
-                       GDKfree(localbuf);
-                       return res;
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to