Changeset: bd6284dac7c3 for MonetDB
Modified Files:
Branch: monetdbe-proxy
Log Message:

MonetDBe now interfaces with remote instead of mapi.

diffs (truncated from 659 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -3683,10 +3683,6 @@ mapi_set_columnar_protocol(Mapi mid, boo
        if (mid->columnar_protocol == columnar_protocol)
                return MOK;
-       if (mid->languageId != LANG_SQL) {
-               mapi_setError(mid, "columnar_protocol only supported in SQL", 
__func__, MERROR);
-               return MERROR;
-       }
        mid->columnar_protocol = columnar_protocol;
        if (columnar_protocol)
                return mapi_Xcommand(mid, "columnar_protocol", "1");
diff --git a/ctest/tools/monetdbe/example_proxy.c 
--- a/ctest/tools/monetdbe/example_proxy.c
+++ b/ctest/tools/monetdbe/example_proxy.c
@@ -30,7 +30,7 @@ main(void)
     monetdbe_options opt = {.remote = &remote};
        // second argument is a string for the db directory or NULL for 
in-memory mode
-       if (monetdbe_open(&mdbe, 
"mapi:monetdb://", &opt))
+       if (monetdbe_open(&mdbe, 
"mapi:monetdb://", &opt))
                error("Failed to open database")
        // Assumes the existance of a table test (x INT, y STRING)  on the 
diff --git a/ctest/tools/monetdbe/example_remote.c 
--- a/ctest/tools/monetdbe/example_remote.c
+++ b/ctest/tools/monetdbe/example_remote.c
@@ -24,10 +24,10 @@ main(void)
        // second argument is a string for the db directory or NULL for 
in-memory mode
        if (monetdbe_open(&mdbe, 
"monetdb://localhost:5000/sf1?user=monetdb&password=monetdb", NULL))
                expected_error("Failed to open database")
-       if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y 
string)", NULL, NULL)) != NULL)
+       /*if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y 
string)", NULL, NULL)) != NULL)
        if ((err = monetdbe_query(mdbe, "INSERT INTO test VALUES (42, 'Hello'), 
(NULL, 'World')", NULL, NULL)) != NULL)
-               error(err)
+               error(err)*/
        if ((err = monetdbe_query(mdbe, "SELECT x, y FROM test; ", &result, 
diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c
--- a/monetdb5/modules/mal/remote.c
+++ b/monetdb5/modules/mal/remote.c
@@ -71,7 +71,7 @@
 #ifdef HAVE_MAPI
-static connection conns = NULL;
+connection conns = NULL;
 static unsigned char localtype = 0177;
 static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const 
char *query);
@@ -151,12 +151,14 @@ static size_t connection_id = 0;
  * Returns a connection to the given uri.  It always returns a newly
  * created connection.
 str RMTconnectScen(
                str *ret,
                str *ouri,
                str *user,
                str *passwd,
-               str *scen)
+               str *scen,
+               bit columnar)
        connection c;
        char conn[BUFSIZ];
@@ -214,6 +216,11 @@ str RMTconnectScen(
                return msg;
+       if (columnar && (mapi_set_columnar_protocol(m, true) != MOK || 
mapi_error(m))) {
+               msg = createException(MAL, "monetdbe.connect", "%s", 
+               return msg;
+       }
        /* connection established, add to list */
        c = GDKzalloc(sizeof(struct _connection));
        if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
@@ -251,14 +258,24 @@ str RMTconnectScen(
-str RMTconnect(
-               str *ret,
-               str *uri,
-               str *user,
-               str *passwd)
-       str scen = "mal";
-       return RMTconnectScen(ret, uri, user, passwd, &scen);
+str RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
+       (void) cntxt;
+       (void) mb;
+               str* ret        = getArgReference_str(stk, pci, 0);
+               str* uri        = getArgReference_str(stk, pci, 1);
+               str* user       = getArgReference_str(stk, pci, 2);
+               str* passwd     = getArgReference_str(stk, pci, 3);
+               str scen = "msql";
+               bit columnar = 0;
+               if (pci->argc >= 5)
+                       scen = *getArgReference_str(stk, pci, 4);
+               if (pci->argc == 6)
+                       columnar = *getArgReference_bit(stk, pci, 5);
+               return RMTconnectScen(ret, uri, user, passwd, &scen, columnar);
@@ -296,7 +313,7 @@ RMTconnectTable(Client cntxt, MalBlkPtr 
        snprintf(pwhash, pwlen + 2, "\1%s", passwd);
-       msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen);
+       msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen, 0);
@@ -1017,7 +1034,13 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
        /* this call should be a single transaction over the channel*/
-       if(pci->argc - pci->retc < 3) /* conn, mod, func, ... */
+       columnar_result_callback* rcb = NULL;
+       ValRecord *v = &(stk)->stk[(pci)->argv[3]];
+       if (pci->retc == 0 && (pci->argc >= 4) && (v->vtype == TYPE_ptr) ) {
+               rcb = (columnar_result_callback*) v->val.pval;
+       }
+       if(!rcb && pci->argc - pci->retc < 3) /* conn, mod, func, ... */
                throw(MAL, "remote.exec", ILLEGAL_ARGUMENT  " MAL instruction 
misses arguments");
        len = 0;
@@ -1027,15 +1050,19 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
                len += 2 * (i > 0);
                len += strlen(*getArgReference_str(stk, pci, i));
+       const int arg_index = rcb?4:3;
        len += strlen(mod) + strlen(func) + 6;
-       for (i = 3; i < pci->argc - pci->retc; i++) {
-               len += 2 * (i > 3);
+       for (i = arg_index; i < pci->argc - pci->retc; i++) {
+               len += 2 * (i > arg_index);
                len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
        len += 2;
        buflen = len + 1;
        if ((qbuf = GDKmalloc(buflen)) == NULL)
-               throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
+               throw(MAL, "remote.exec", SQLSTATE(HY01arg_index) 
        len = 0;
@@ -1049,15 +1076,21 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
                qbuf[len++] = ')';
        /* build the function invocation string in qbuf */
-       len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
+       if (pci->retc > 0) {
+               len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, 
+       }
+       else {
+               len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
+       }
        /* handle the arguments to the function */
        /* put the arguments one by one, and dynamically build the
         * invocation string */
-       for (i = 3; i < pci->argc - pci->retc; i++) {
+       for (i = arg_index; i < pci->argc - pci->retc; i++) {
                len += snprintf(&qbuf[len], buflen - len, "%s%s",
-                               (i > 3 ? ", " : ""),
+                               (i > arg_index ? ", " : ""),
                                *(getArgReference_str(stk, pci, pci->retc + 
@@ -1066,9 +1099,64 @@ str RMTexec(Client cntxt, MalBlkPtr mb, 
        TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
        tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
+       /* Temporary hack:
+        * use a callback to immediately handle columnar results before hdl is 
destroyed. */
+       if(tmp == MAL_SUCCEED && rcb && mhdl) {
+               int fields = mapi_get_field_count(mhdl);
+               columnar_result* results = GDKzalloc(sizeof(columnar_result) * 
+               for (int i = 0; i < fields; i++) {
+                       char* s = mapi_get_name(mhdl, i);
+                       if (s == NULL)
+                               s = "";
+                       printf("%s%s", " ", s);
+               }
+               char buf[256] = {0};
+               stream* sin = mapi_get_from(c->mconn);
+               char* tblname = mapi_get_table(mhdl, 0);
+               int i;
+               for (i = 0; i < fields; i++) {
+                       BAT *b = NULL;
+                       RMTreadbatheader(sin, buf);
+                       RMTinternalcopyfrom(&b, buf, sin);
+                       if ( b == NULL) {
+                               tmp= 
createException(MAL,"sql.resultset",SQLSTATE(HY005) "Cannot access column 
descriptor ");
+                               break;
+                       }
+                       results[i].id = b->batCacheid;
+                       results[i].colname = mapi_get_name(mhdl, i);
+                       results[i].tpename = mapi_get_type(mhdl, i);
+                       results[i].digits = mapi_get_digits(mhdl, i);
+                       results[i].scale = mapi_get_scale(mhdl, i);
+                       BBPkeepref(results[i].id);
+               }
+               if (tmp != MAL_SUCCEED) {
+                       for (int j = 0; j < i; j++) {
+                               BBPrelease(results[j].id);
+                       }
+               }
+               else {
+                       tmp = rcb->call(rcb->context, tblname, results, fields);
+               }
+               GDKfree(results);
+       }
        if (mhdl)
        return tmp;
@@ -1486,8 +1574,8 @@ mel_func remote_init_funcs[] = {
  command("remote", "prelude", RMTprelude, false, "initialise the remote 
module", args(1,1, arg("",void))),
  command("remote", "epilogue", RMTepilogue, false, "release the resources held 
by the remote module", args(1,1, arg("",void))),
  command("remote", "resolve", RMTresolve, false, "resolve a pattern against 
Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
- command("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name and password", args(1,4, 
- command("remote", "connect", RMTconnectScen, false, "returns a newly created 
connection for uri, using user name, password and scenario", args(1,5, 
+ pattern("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name and password", args(1,4, 
+ pattern("remote", "connect", RMTconnect, false, "returns a newly created 
connection for uri, using user name, password and scenario", args(1,5, 
  pattern("remote", "connect", RMTconnectTable, false, "return a newly created 
connection for a table. username and password should be in the vault", 
args(1,3, arg("",str),arg("table",str),arg("schen",str))),
  command("remote", "disconnect", RMTdisconnect, false, "disconnects the 
connection pointed to by handle (received from a call to connect()", args(1,2, 
  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object 
ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
@@ -1497,6 +1585,7 @@ mel_func remote_init_funcs[] = {
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and 
returns the handle to its result", args(1,4, 
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects and returns the handle to its 
result", args(1,5, 
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects and returns the handle to its 
result", args(1,5, 
+ pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects using and applying function pointer 
rcb as result handling callback.", args(1,5, 
  command("remote", "isalive", RMTisalive, false, "check if conn is still valid 
and connected", args(1,2, arg("",int),arg("conn",str))),
  pattern("remote", "batload", RMTbatload, false, "create a BAT of the given 
type and size, and load values from the input stream", args(1,3, 
  pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary 
form to the stream", args(1,2, arg("",void),batargany("b",0))),
diff --git a/monetdb5/modules/mal/remote.h b/monetdb5/modules/mal/remote.h
--- a/monetdb5/modules/mal/remote.h
+++ b/monetdb5/modules/mal/remote.h
@@ -26,6 +26,19 @@
 #define RMTT_32_OIDS    (0<<3)
 #define RMTT_64_OIDS    (1<<3)
+typedef struct {
+       bat id;
+       char* colname;
+       char* tpename;
+       int digits;
+       int scale;
+} columnar_result;
+typedef struct {
+       void* context;
+       str (*call) (void* context, char* tblname, columnar_result* columns, 
size_t  nrcolumns);
+} columnar_result_callback;
 typedef struct _connection {
        MT_Lock            lock;      /* lock to avoid interference */
        str                name;      /* the handle for this connection */
@@ -33,6 +46,7 @@ typedef struct _connection {
        unsigned char      type;      /* binary profile of the connection 
target */
        size_t             nextid;    /* id counter */
        struct _connection *next;     /* the next connection in the list */
+       MapiHdl                    hdl;       /* MapiHdl  */
 } *connection;
