Changeset: bd6284dac7c3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bd6284dac7c3
Modified Files:
        clients/mapilib/mapi.c
        ctest/tools/monetdbe/example_proxy.c
        ctest/tools/monetdbe/example_remote.c
        monetdb5/modules/mal/remote.c
        monetdb5/modules/mal/remote.h
        monetdb5/modules/mal/remote.mal
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql.mal
        sql/backends/monet5/sql_execute.c
        sql/backends/monet5/sql_scenario.c
        tools/monetdbe/monetdbe.c
Branch: monetdbe-proxy
Log Message:

MonetDBe now interfaces with remote instead of mapi.


diffs (truncated from 659 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -3683,10 +3683,6 @@ mapi_set_columnar_protocol(Mapi mid, boo
 {
        if (mid->columnar_protocol == columnar_protocol)
                return MOK;
-       if (mid->languageId != LANG_SQL) {
-               mapi_setError(mid, "columnar_protocol only supported in SQL", 
__func__, MERROR);
-               return MERROR;
-       }
        mid->columnar_protocol = columnar_protocol;
        if (columnar_protocol)
                return mapi_Xcommand(mid, "columnar_protocol", "1");
diff --git a/ctest/tools/monetdbe/example_proxy.c 
b/ctest/tools/monetdbe/example_proxy.c
--- a/ctest/tools/monetdbe/example_proxy.c
+++ b/ctest/tools/monetdbe/example_proxy.c
@@ -30,7 +30,7 @@ main(void)
     monetdbe_options opt = {.remote = &remote};
 
        // second argument is a string for the db directory or NULL for 
in-memory mode
-       if (monetdbe_open(&mdbe, 
"mapi:monetdb://127.0.0.1:50001?database=devdb1", &opt))
+       if (monetdbe_open(&mdbe, 
"mapi:monetdb://127.0.0.1:50000?database=devdb", &opt))
                error("Failed to open database")
 
        // Assumes the existance of a table test (x INT, y STRING)  on the 
remote.
diff --git a/ctest/tools/monetdbe/example_remote.c 
b/ctest/tools/monetdbe/example_remote.c
--- a/ctest/tools/monetdbe/example_remote.c
+++ b/ctest/tools/monetdbe/example_remote.c
@@ -24,10 +24,10 @@ main(void)
        // second argument is a string for the db directory or NULL for 
in-memory mode
        if (monetdbe_open(&mdbe, 
"monetdb://localhost:5000/sf1?user=monetdb&password=monetdb", NULL))
                expected_error("Failed to open database")
-       if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y 
string)", NULL, NULL)) != NULL)
+       /*if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y 
string)", NULL, NULL)) != NULL)
                error(err)
        if ((err = monetdbe_query(mdbe, "INSERT INTO test VALUES (42, 'Hello'), 
(NULL, 'World')", NULL, NULL)) != NULL)
-               error(err)
+               error(err)*/
        if ((err = monetdbe_query(mdbe, "SELECT x, y FROM test; ", &result, 
NULL)) != NULL)
                error(err)
 
diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c
--- a/monetdb5/modules/mal/remote.c
+++ b/monetdb5/modules/mal/remote.c
@@ -71,7 +71,7 @@
  */
 #ifdef HAVE_MAPI
 
-static connection conns = NULL;
+connection conns = NULL;
 static unsigned char localtype = 0177;
 
 static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const 
char *query);
@@ -151,12 +151,14 @@ static size_t connection_id = 0;
  * Returns a connection to the given uri.  It always returns a newly
  * created connection.
  */
+static
 str RMTconnectScen(
                str *ret,
                str *ouri,
                str *user,
                str *passwd,
-               str *scen)
+               str *scen,
+               bit columnar)
 {
        connection c;
        char conn[BUFSIZ];
@@ -214,6 +216,11 @@ str RMTconnectScen(
                return msg;
        }
 
+       if (columnar && (mapi_set_columnar_protocol(m, true) != MOK || 
mapi_error(m))) {
+               msg = createException(MAL, "monetdbe.connect", "%s", 
mapi_error_str(m));
+               return msg;
+       }
+
        /* connection established, add to list */
        c = GDKzalloc(sizeof(struct _connection));
        if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
@@ -251,14 +258,24 @@ str RMTconnectScen(
        return(MAL_SUCCEED);
 }
 
-str RMTconnect(
-               str *ret,
-               str *uri,
-               str *user,
-               str *passwd)
-{
-       str scen = "mal";
-       return RMTconnectScen(ret, uri, user, passwd, &scen);
+str RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
+       (void) cntxt;
+       (void) mb;
+               str* ret        = getArgReference_str(stk, pci, 0);
+               str* uri        = getArgReference_str(stk, pci, 1);
+               str* user       = getArgReference_str(stk, pci, 2);
+               str* passwd     = getArgReference_str(stk, pci, 3);
+
+               str scen = "msql";
+               bit columnar = 0;
+
+               if (pci->argc >= 5)
+                       scen = *getArgReference_str(stk, pci, 4);
+               
+               if (pci->argc == 6)
+                       columnar = *getArgReference_bit(stk, pci, 5);
+
+               return RMTconnectScen(ret, uri, user, passwd, &scen, columnar);
 }
 
 str
@@ -296,7 +313,7 @@ RMTconnectTable(Client cntxt, MalBlkPtr 
        }
        snprintf(pwhash, pwlen + 2, "\1%s", passwd);
 
-       msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen);
+       msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen, 0);
 
        GDKfree(passwd);
        GDKfree(pwhash);
@@ -1017,7 +1034,13 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
        /* this call should be a single transaction over the channel*/
        MT_lock_set(&c->lock);
 
-       if(pci->argc - pci->retc < 3) /* conn, mod, func, ... */
+       columnar_result_callback* rcb = NULL;
+       ValRecord *v = &(stk)->stk[(pci)->argv[3]];
+       if (pci->retc == 0 && (pci->argc >= 4) && (v->vtype == TYPE_ptr) ) {
+               rcb = (columnar_result_callback*) v->val.pval;
+       }
+
+       if(!rcb && pci->argc - pci->retc < 3) /* conn, mod, func, ... */
                throw(MAL, "remote.exec", ILLEGAL_ARGUMENT  " MAL instruction 
misses arguments");
 
        len = 0;
@@ -1027,15 +1050,19 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
                len += 2 * (i > 0);
                len += strlen(*getArgReference_str(stk, pci, i));
        }
+
+       const int arg_index = rcb?4:3;
+
+
        len += strlen(mod) + strlen(func) + 6;
-       for (i = 3; i < pci->argc - pci->retc; i++) {
-               len += 2 * (i > 3);
+       for (i = arg_index; i < pci->argc - pci->retc; i++) {
+               len += 2 * (i > arg_index);
                len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
        }
        len += 2;
        buflen = len + 1;
        if ((qbuf = GDKmalloc(buflen)) == NULL)
-               throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
+               throw(MAL, "remote.exec", SQLSTATE(HY01arg_index) 
MAL_MALLOC_FAIL);
 
        len = 0;
 
@@ -1049,15 +1076,21 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
                qbuf[len++] = ')';
 
        /* build the function invocation string in qbuf */
-       len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
+
+       if (pci->retc > 0) {
+               len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, 
func);
+       }
+       else {
+               len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
+       }
 
        /* handle the arguments to the function */
 
        /* put the arguments one by one, and dynamically build the
         * invocation string */
-       for (i = 3; i < pci->argc - pci->retc; i++) {
+       for (i = arg_index; i < pci->argc - pci->retc; i++) {
                len += snprintf(&qbuf[len], buflen - len, "%s%s",
-                               (i > 3 ? ", " : ""),
+                               (i > arg_index ? ", " : ""),
                                *(getArgReference_str(stk, pci, pci->retc + 
i)));
        }
 
@@ -1066,9 +1099,64 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
        TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
        tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
        GDKfree(qbuf);
+
+       /* Temporary hack:
+        * use a callback to immediately handle columnar results before hdl is 
destroyed. */
+       if(tmp == MAL_SUCCEED && rcb && mhdl) {
+
+               int fields = mapi_get_field_count(mhdl);
+
+               columnar_result* results = GDKzalloc(sizeof(columnar_result) * 
fields);         
+
+               for (int i = 0; i < fields; i++) {
+                       char* s = mapi_get_name(mhdl, i);
+                       if (s == NULL)
+                               s = "";
+                       printf("%s%s", " ", s);
+               }
+
+               char buf[256] = {0};
+
+               stream* sin = mapi_get_from(c->mconn);
+
+               char* tblname = mapi_get_table(mhdl, 0);
+
+               int i;
+               for (i = 0; i < fields; i++) {
+                       BAT *b = NULL;
+
+                       RMTreadbatheader(sin, buf);
+                       RMTinternalcopyfrom(&b, buf, sin);
+
+                       if ( b == NULL) {
+                               tmp= 
createException(MAL,"sql.resultset",SQLSTATE(HY005) "Cannot access column 
descriptor ");
+                               break;
+                       }
+
+                       results[i].id = b->batCacheid;
+                       results[i].colname = mapi_get_name(mhdl, i);
+                       results[i].tpename = mapi_get_type(mhdl, i);
+                       results[i].digits = mapi_get_digits(mhdl, i);
+                       results[i].scale = mapi_get_scale(mhdl, i);
+                       BBPkeepref(results[i].id);
+               }
+
+               if (tmp != MAL_SUCCEED) {
+                       for (int j = 0; j < i; j++) {
+                               BBPrelease(results[j].id);
+                       }
+               }
+               else {
+                       tmp = rcb->call(rcb->context, tblname, results, fields);
+               }
+               GDKfree(results);
+       }
+
        if (mhdl)
                mapi_close_handle(mhdl);
+
        MT_lock_unset(&c->lock);
+
        return tmp;
 }
 
@@ -1486,8 +1574,8 @@ mel_func remote_init_funcs[] = {
  command("remote", "prelude", RMTprelude, false, "initialise the remote 
module", args(1,1, arg("",void))),
  command("remote", "epilogue", RMTepilogue, false, "release the resources held 
by the remote module", args(1,1, arg("",void))),
  command("remote", "resolve", RMTresolve, false, "resolve a pattern against 
Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
- command("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name and password", args(1,4, 
arg("",str),arg("uri",str),arg("user",str),arg("passwd",str))),
- command("remote", "connect", RMTconnectScen, false, "returns a newly created 
connection for uri, using user name, password and scenario", args(1,5, 
arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
+ pattern("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name and password", args(1,4, 
arg("",str),arg("uri",str),arg("user",str),arg("passwd",str))),
+ pattern("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name, password and scenario", args(1,5, 
arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
  pattern("remote", "connect", RMTconnectTable, false, "return a newly created 
connection for a table. username and password should be in the vault", 
args(1,3, arg("",str),arg("table",str),arg("schen",str))),
  command("remote", "disconnect", RMTdisconnect, false, "disconnects the 
connection pointed to by handle (received from a call to connect()", args(1,2, 
arg("",void),arg("conn",str))),
  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object 
ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
@@ -1497,6 +1585,7 @@ mel_func remote_init_funcs[] = {
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and 
returns the handle to its result", args(1,4, 
vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects and returns the handle to its 
result", args(1,5, 
arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects and returns the handle to its 
result", args(1,5, 
vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
+ pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects using and applying function pointer 
rcb as result handling callback.", args(1,5, 
vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr))),
  command("remote", "isalive", RMTisalive, false, "check if conn is still valid 
and connected", args(1,2, arg("",int),arg("conn",str))),
  pattern("remote", "batload", RMTbatload, false, "create a BAT of the given 
type and size, and load values from the input stream", args(1,3, 
batargany("",1),argany("tt",1),arg("size",int))),
  pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary 
form to the stream", args(1,2, arg("",void),batargany("b",0))),
diff --git a/monetdb5/modules/mal/remote.h b/monetdb5/modules/mal/remote.h
--- a/monetdb5/modules/mal/remote.h
+++ b/monetdb5/modules/mal/remote.h
@@ -26,6 +26,19 @@
 #define RMTT_32_OIDS    (0<<3)
 #define RMTT_64_OIDS    (1<<3)
 
+typedef struct {
+       bat id;
+       char* colname;
+       char* tpename;
+       int digits;
+       int scale;
+} columnar_result;
+
+typedef struct {
+       void* context;
+       str (*call) (void* context, char* tblname, columnar_result* columns, 
size_t  nrcolumns);
+} columnar_result_callback;
+
 typedef struct _connection {
        MT_Lock            lock;      /* lock to avoid interference */
        str                name;      /* the handle for this connection */
@@ -33,6 +46,7 @@ typedef struct _connection {
        unsigned char      type;      /* binary profile of the connection 
target */
        size_t             nextid;    /* id counter */
        struct _connection *next;     /* the next connection in the list */
+       MapiHdl                    hdl;       /* MapiHdl  */
 } *connection;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to