Changeset: 8a26d4902af5 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8a26d4902af5 Modified Files: Branch: default Log Message:
Merge with default diffs (truncated from 472 to 300 lines): diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_int.mal b/monetdb5/modules/mal/Tests/bincopyfrom_int.mal new file mode 100644 index 0000000000000000000000000000000000000000..c9f3f7ce9f0cd3a59dfb1ecee1135ec92f2ed288 GIT binary patch literal 252 zc$_7U!A`_53`Bd*SB$ckE{awQsyGyJSiXTDNES<tNSzXUw+e`VC$zVjXM4s6=oAG? zqR5;o)?7ZNNqD5Fnh3-n(KDoq(u{s3+=LKW9VEl2)mJeG3<$Nw99>H!yj|XL&8$AM zY=Ccbpn~D4U2dR_k}+X?Ei`APQ}7xhU-MG?<BR#RcPbJy+wsxaN$Wy(JUgqqdUjUm lD|p$d#BbUb|3h~(G&7N3c6a_Y4_()pm`_V-ulJ#K`~d@fQR@Hz diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_str.mal b/monetdb5/modules/mal/Tests/bincopyfrom_str.mal new file mode 100644 index 0000000000000000000000000000000000000000..df2b5b515cb6cf8377d7db5528315802e33242c0 GIT binary patch literal 8482 zc%1Fpu}%Xq3<luNnHZRutaQ0br;0)c)QSOCRtDDGVQ~>@a>RxzMLZxIFTy);+zmVd zV&MOzNVb*Oet^tLJ1?XxN-2gy`;;GL2+2|Ks)32)y?2qMi1LoAJ=3!=iRPLlkqdiQ z#LQ_;O&Quy&zaiJ<wZTTX1cz*r8Y%%KFU}a9{Tt|w&M_zB_GG?a9nRpv$@Q?OUlc} zY2#y?`cf^n=GDG%7N4TstZLh7>Dg{hYpYYVI<KwpQ{!YRi$Cyqa~K<blJ6N)c5${^ zznP;Mz;B?xUH||900000000000002sf4Vuo`}jP0Ka778X2z#wpMBJ-S!~T04^>_} diff --git a/monetdb5/modules/mal/remote.mx b/monetdb5/modules/mal/remote.mx --- a/monetdb5/modules/mal/remote.mx +++ b/monetdb5/modules/mal/remote.mx @@ -132,6 +132,20 @@ address RMTbatload comment "create a BAT of the given type and size, and load values from the input stream"; +pattern batbincopy(b:bat):void +address RMTbincopyto +comment "dump BAT b in binary form to the stream"; +pattern batbincopy():bat[:void,:any] +address RMTbincopyfrom +comment "store the binary BAT data in the BBP and return as BAT"; + +pattern bintype():void +address RMTbintype +comment "print the binary type of this mserver5"; + + +# initialise our localtype +remote.prelude(); @{ @h @@ -183,10 +197,19 @@ */ @- Implementation @c + +#define RMTT_L_ENDIAN 0<<1 +#define RMTT_B_ENDIAN 1<<1 +#define RMTT_32_BITS 0<<2 +#define RMTT_64_BITS 1<<2 +#define RMTT_32_OIDS 0<<3 +#define RMTT_64_OIDS 1<<3 + typedef struct _connection { MT_Lock lock; /* lock to avoid interference */ str name; /* the handle for this connection */ Mapi mconn; /* the Mapi handle for the connection */ + unsigned char type; /* binary profile of the connection target */ size_t nextid; /* id counter */ struct _connection *next; /* the next connection in the list */ } *connection; @@ -196,8 +219,10 @@ remote_export str RMTepilogue(int *ret); @c static connection conns = NULL; +static unsigned char localtype = 0; static inline str RMTquery(MapiHdl *ret, str func, Mapi conn, str query); +static inline str RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in); @h remote_export str RMTresolve(int *ret, str *pat); @@ -282,6 +307,7 @@ char conn[BUFSIZ]; char *s; Mapi m; + MapiHdl hdl; /* just make sure the return isn't garbage */ *ret = 0; @@ -298,6 +324,9 @@ if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0) throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is " "NULL or nil"); + if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0) + throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenation '%s' " + "is not supported", *scen); m = mapi_mapiuri(*ouri, *user, *passwd, *scen); if (mapi_error(m)) @@ -331,6 +360,15 @@ c->next = conns; conns = c; + RMTquery(&hdl, "remote.connect", m, "remote.bintype();"); + if (hdl != NULL && mapi_fetch_row(hdl)) { + char *val = mapi_fetch_field(hdl, 0); + c->type = (unsigned char)atoi(val); + mapi_close_handle(hdl); + } else { + c->type = 0; + } + MT_lock_init(&c->lock, "remote connection lock"); #ifdef _DEBUG_MAPI_ @@ -521,6 +559,24 @@ str RMTprelude(int *ret) { (void)ret; + int type = 0; + +#ifdef WORDS_BIGENDIAN + type |= RMTT_B_ENDIAN; +#else + type |= RMTT_L_ENDIAN; +#endif +#if SIZEOF_SIZE_T == SIZEOF_LONG_LONG + type |= RMTT_64_BITS; +#else + type |= RMTT_32_BITS; +#endif +#if SIZEOF_SIZE_T == SIZEOF_INT || defined(MONET_OID32) + type |= RMTT_32_OIDS; +#else + type |= RMTT_64_OIDS; +#endif + localtype = (unsigned char)type; return(MAL_SUCCEED); } @@ -560,7 +616,7 @@ str conn, ident, tmp, rt; connection c; char qbuf[BUFSIZ + 1]; - MapiHdl mhdl; + MapiHdl mhdl = NULL; int rtype; ValPtr v; @@ -593,7 +649,8 @@ rt, ident); GDKfree(rt); - if (isaBatType(rtype)) { + if (isaBatType(rtype) && (localtype == 0 || localtype != c->type || getHeadType(rtype) != TYPE_void)) + { int h, t, s; ptr l, r; str val, var; @@ -608,9 +665,6 @@ /* this call should be a single transaction over the channel*/ mal_set_lock(c->lock, "remote.get"); -#ifdef _DEBUG_REMOTE - mnstr_printf(cntxt->fdout, "#remote:%s:%s\n", c->name, qbuf); -#endif if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) { @@ -656,6 +710,43 @@ v->val.bval = b->batCacheid; v->vtype = TYPE_bat; BBPkeepref(b->batCacheid); + } else if (isaBatType(rtype)) { + /* binary compatible remote host, transfer BAT in binary form */ + stream *sout; + stream *sin; + char buf[256]; + ssize_t sz = 0, rd; + str err; + BAT *b = NULL; + + /* this call should be a single transaction over the channel*/ + mal_set_lock(c->lock, "remote.get"); + + /* bypass Mapi from this point to efficiently write all data to + * the server */ + sout = mapi_get_to(c->mconn); + sin = mapi_get_from(c->mconn); + + /* call our remote helper to do this more efficiently */ + mnstr_printf(sout, "remote.batbincopy(%s);\n", ident); + mnstr_flush(sout); + + /* read the JSON header */ + while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') { + sz += rd; + } + if (rd < 0) + throw(MAL, "remote.get", "could not read BAT JSON header"); + if (buf[0] == '!') + return(GDKstrdup(buf)); + + buf[sz] = '\0'; + if ((err = RMTinternalcopyfrom(&b, buf, sin)) != NULL) + return(err); + + v->val.bval = b->batCacheid; + v->vtype = TYPE_bat; + BBPkeepref(b->batCacheid); } else { ptr p = NULL; str val; @@ -692,7 +783,8 @@ } } - mapi_close_handle(mhdl); + if (mhdl != NULL) + mapi_close_handle(mhdl); mal_unset_lock(c->lock, "remote.get"); return(MAL_SUCCEED); @@ -1099,6 +1191,217 @@ return(MAL_SUCCEED); } +@h +remote_export str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +@c +/** + * dump given BAT to stream + */ +str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int bid = *(int *)getArgReference(stk, pci, 1); + BAT *b = BBPquickdesc(bid, FALSE); + + (void)mb; + (void)stk; + (void)pci; + + if (b == NULL) + throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED); + if (b->htype != TYPE_void) + throw(ILLARG, "remote.bincopyto", "only void-headed BATs are supported"); + + mnstr_printf(cntxt->fdout, /*JSON*/"{" + "\"version\":1," + "\"htype\":%d," + "\"ttype\":%d," + "\"seqbase\":" OIDFMT "," + "\"size\":" SZFMT "," + "\"tailsize\":" SZFMT "," + "\"theapsize\":" SZFMT + "}\n", + TYPE_void, + b->ttype, + b->hseqbase == oid_nil ? 0 : b->hseqbase, + BATcount(b), + BATcount(b) * b->T->width, + b->T->varsized ? b->T->vheap->free : 0 + ); + mnstr_write(cntxt->fdout, /* tail */ + b->T->heap.base + (b->U->first * b->T->width), + BATcount(b) * b->T->width, 1); + if (b->T->varsized) + mnstr_write(cntxt->fdout, /* theap */ + b->T->vheap->base, + b->T->vheap->free, 1); + mnstr_flush(cntxt->fdout); + + return(MAL_SUCCEED); +} + +@h +remote_export str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +@c +typedef struct _binbat_v1 { + int Htype; + int Ttype; + oid seqbase; + size_t size; + size_t tailsize; + size_t theapsize; +} binbat; + +static inline str +RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in) +{ + binbat bb = { 0, 0, 0, 0, 0, 0 }; + char *nme = NULL; + char *val = NULL; + + BAT *b; + + /* hdr is a JSON structure that looks like + * {"version":1,"htype":0,"ttype":6,"seqbase":0,"tailsize":4,"theapsize":0} + * we take the binary data directly from the stream */ + + /* could skip whitespace, but we just don't allow that */ + if (*hdr++ != '{') + throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header"); + while (*hdr != '\0') { + switch (*hdr) { _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list