Changeset: bd6284dac7c3 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bd6284dac7c3 Modified Files: clients/mapilib/mapi.c ctest/tools/monetdbe/example_proxy.c ctest/tools/monetdbe/example_remote.c monetdb5/modules/mal/remote.c monetdb5/modules/mal/remote.h monetdb5/modules/mal/remote.mal sql/backends/monet5/sql.c sql/backends/monet5/sql.mal sql/backends/monet5/sql_execute.c sql/backends/monet5/sql_scenario.c tools/monetdbe/monetdbe.c Branch: monetdbe-proxy Log Message:
MonetDBe now interfaces with remote instead of mapi. diffs (truncated from 659 to 300 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -3683,10 +3683,6 @@ mapi_set_columnar_protocol(Mapi mid, boo { if (mid->columnar_protocol == columnar_protocol) return MOK; - if (mid->languageId != LANG_SQL) { - mapi_setError(mid, "columnar_protocol only supported in SQL", __func__, MERROR); - return MERROR; - } mid->columnar_protocol = columnar_protocol; if (columnar_protocol) return mapi_Xcommand(mid, "columnar_protocol", "1"); diff --git a/ctest/tools/monetdbe/example_proxy.c b/ctest/tools/monetdbe/example_proxy.c --- a/ctest/tools/monetdbe/example_proxy.c +++ b/ctest/tools/monetdbe/example_proxy.c @@ -30,7 +30,7 @@ main(void) monetdbe_options opt = {.remote = &remote}; // second argument is a string for the db directory or NULL for in-memory mode - if (monetdbe_open(&mdbe, "mapi:monetdb://127.0.0.1:50001?database=devdb1", &opt)) + if (monetdbe_open(&mdbe, "mapi:monetdb://127.0.0.1:50000?database=devdb", &opt)) error("Failed to open database") // Assumes the existance of a table test (x INT, y STRING) on the remote. diff --git a/ctest/tools/monetdbe/example_remote.c b/ctest/tools/monetdbe/example_remote.c --- a/ctest/tools/monetdbe/example_remote.c +++ b/ctest/tools/monetdbe/example_remote.c @@ -24,10 +24,10 @@ main(void) // second argument is a string for the db directory or NULL for in-memory mode if (monetdbe_open(&mdbe, "monetdb://localhost:5000/sf1?user=monetdb&password=monetdb", NULL)) expected_error("Failed to open database") - if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y string)", NULL, NULL)) != NULL) + /*if ((err = monetdbe_query(mdbe, "CREATE TABLE test (x integer, y string)", NULL, NULL)) != NULL) error(err) if ((err = monetdbe_query(mdbe, "INSERT INTO test VALUES (42, 'Hello'), (NULL, 'World')", NULL, NULL)) != NULL) - error(err) + error(err)*/ if ((err = monetdbe_query(mdbe, "SELECT x, y FROM test; ", &result, NULL)) != NULL) error(err) diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c --- a/monetdb5/modules/mal/remote.c +++ b/monetdb5/modules/mal/remote.c @@ -71,7 +71,7 @@ */ #ifdef HAVE_MAPI -static connection conns = NULL; +connection conns = NULL; static unsigned char localtype = 0177; static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query); @@ -151,12 +151,14 @@ static size_t connection_id = 0; * Returns a connection to the given uri. It always returns a newly * created connection. */ +static str RMTconnectScen( str *ret, str *ouri, str *user, str *passwd, - str *scen) + str *scen, + bit columnar) { connection c; char conn[BUFSIZ]; @@ -214,6 +216,11 @@ str RMTconnectScen( return msg; } + if (columnar && (mapi_set_columnar_protocol(m, true) != MOK || mapi_error(m))) { + msg = createException(MAL, "monetdbe.connect", "%s", mapi_error_str(m)); + return msg; + } + /* connection established, add to list */ c = GDKzalloc(sizeof(struct _connection)); if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) { @@ -251,14 +258,24 @@ str RMTconnectScen( return(MAL_SUCCEED); } -str RMTconnect( - str *ret, - str *uri, - str *user, - str *passwd) -{ - str scen = "mal"; - return RMTconnectScen(ret, uri, user, passwd, &scen); +str RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { + (void) cntxt; + (void) mb; + str* ret = getArgReference_str(stk, pci, 0); + str* uri = getArgReference_str(stk, pci, 1); + str* user = getArgReference_str(stk, pci, 2); + str* passwd = getArgReference_str(stk, pci, 3); + + str scen = "msql"; + bit columnar = 0; + + if (pci->argc >= 5) + scen = *getArgReference_str(stk, pci, 4); + + if (pci->argc == 6) + columnar = *getArgReference_bit(stk, pci, 5); + + return RMTconnectScen(ret, uri, user, passwd, &scen, columnar); } str @@ -296,7 +313,7 @@ RMTconnectTable(Client cntxt, MalBlkPtr } snprintf(pwhash, pwlen + 2, "\1%s", passwd); - msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen); + msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen, 0); GDKfree(passwd); GDKfree(pwhash); @@ -1017,7 +1034,13 @@ str RMTexec(Client cntxt, MalBlkPtr mb, /* this call should be a single transaction over the channel*/ MT_lock_set(&c->lock); - if(pci->argc - pci->retc < 3) /* conn, mod, func, ... */ + columnar_result_callback* rcb = NULL; + ValRecord *v = &(stk)->stk[(pci)->argv[3]]; + if (pci->retc == 0 && (pci->argc >= 4) && (v->vtype == TYPE_ptr) ) { + rcb = (columnar_result_callback*) v->val.pval; + } + + if(!rcb && pci->argc - pci->retc < 3) /* conn, mod, func, ... */ throw(MAL, "remote.exec", ILLEGAL_ARGUMENT " MAL instruction misses arguments"); len = 0; @@ -1027,15 +1050,19 @@ str RMTexec(Client cntxt, MalBlkPtr mb, len += 2 * (i > 0); len += strlen(*getArgReference_str(stk, pci, i)); } + + const int arg_index = rcb?4:3; + + len += strlen(mod) + strlen(func) + 6; - for (i = 3; i < pci->argc - pci->retc; i++) { - len += 2 * (i > 3); + for (i = arg_index; i < pci->argc - pci->retc; i++) { + len += 2 * (i > arg_index); len += strlen(*getArgReference_str(stk, pci, pci->retc + i)); } len += 2; buflen = len + 1; if ((qbuf = GDKmalloc(buflen)) == NULL) - throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL); + throw(MAL, "remote.exec", SQLSTATE(HY01arg_index) MAL_MALLOC_FAIL); len = 0; @@ -1049,15 +1076,21 @@ str RMTexec(Client cntxt, MalBlkPtr mb, qbuf[len++] = ')'; /* build the function invocation string in qbuf */ - len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func); + + if (pci->retc > 0) { + len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func); + } + else { + len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func); + } /* handle the arguments to the function */ /* put the arguments one by one, and dynamically build the * invocation string */ - for (i = 3; i < pci->argc - pci->retc; i++) { + for (i = arg_index; i < pci->argc - pci->retc; i++) { len += snprintf(&qbuf[len], buflen - len, "%s%s", - (i > 3 ? ", " : ""), + (i > arg_index ? ", " : ""), *(getArgReference_str(stk, pci, pci->retc + i))); } @@ -1066,9 +1099,64 @@ str RMTexec(Client cntxt, MalBlkPtr mb, TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf); tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf); GDKfree(qbuf); + + /* Temporary hack: + * use a callback to immediately handle columnar results before hdl is destroyed. */ + if(tmp == MAL_SUCCEED && rcb && mhdl) { + + int fields = mapi_get_field_count(mhdl); + + columnar_result* results = GDKzalloc(sizeof(columnar_result) * fields); + + for (int i = 0; i < fields; i++) { + char* s = mapi_get_name(mhdl, i); + if (s == NULL) + s = ""; + printf("%s%s", " ", s); + } + + char buf[256] = {0}; + + stream* sin = mapi_get_from(c->mconn); + + char* tblname = mapi_get_table(mhdl, 0); + + int i; + for (i = 0; i < fields; i++) { + BAT *b = NULL; + + RMTreadbatheader(sin, buf); + RMTinternalcopyfrom(&b, buf, sin); + + if ( b == NULL) { + tmp= createException(MAL,"sql.resultset",SQLSTATE(HY005) "Cannot access column descriptor "); + break; + } + + results[i].id = b->batCacheid; + results[i].colname = mapi_get_name(mhdl, i); + results[i].tpename = mapi_get_type(mhdl, i); + results[i].digits = mapi_get_digits(mhdl, i); + results[i].scale = mapi_get_scale(mhdl, i); + BBPkeepref(results[i].id); + } + + if (tmp != MAL_SUCCEED) { + for (int j = 0; j < i; j++) { + BBPrelease(results[j].id); + } + } + else { + tmp = rcb->call(rcb->context, tblname, results, fields); + } + GDKfree(results); + } + if (mhdl) mapi_close_handle(mhdl); + MT_lock_unset(&c->lock); + return tmp; } @@ -1486,8 +1574,8 @@ mel_func remote_init_funcs[] = { command("remote", "prelude", RMTprelude, false, "initialise the remote module", args(1,1, arg("",void))), command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))), command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))), - command("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,4, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str))), - command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))), + pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,4, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str))), + pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))), pattern("remote", "connect", RMTconnectTable, false, "return a newly created connection for a table. username and password should be in the vault", args(1,3, arg("",str),arg("table",str),arg("schen",str))), command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))), pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))), @@ -1497,6 +1585,7 @@ mel_func remote_init_funcs[] = { pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))), pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))), pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))), + pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects using and applying function pointer rcb as result handling callback.", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr))), command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))), pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))), pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))), diff --git a/monetdb5/modules/mal/remote.h b/monetdb5/modules/mal/remote.h --- a/monetdb5/modules/mal/remote.h +++ b/monetdb5/modules/mal/remote.h @@ -26,6 +26,19 @@ #define RMTT_32_OIDS (0<<3) #define RMTT_64_OIDS (1<<3) +typedef struct { + bat id; + char* colname; + char* tpename; + int digits; + int scale; +} columnar_result; + +typedef struct { + void* context; + str (*call) (void* context, char* tblname, columnar_result* columns, size_t nrcolumns); +} columnar_result_callback; + typedef struct _connection { MT_Lock lock; /* lock to avoid interference */ str name; /* the handle for this connection */ @@ -33,6 +46,7 @@ typedef struct _connection { unsigned char type; /* binary profile of the connection target */ size_t nextid; /* id counter */ struct _connection *next; /* the next connection in the list */ + MapiHdl hdl; /* MapiHdl */ } *connection; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list