Changeset: 8a26d4902af5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8a26d4902af5
Modified Files:
        
Branch: default
Log Message:

Merge with default


diffs (truncated from 472 to 300 lines):

diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_int.mal 
b/monetdb5/modules/mal/Tests/bincopyfrom_int.mal
new file mode 100644
index 
0000000000000000000000000000000000000000..c9f3f7ce9f0cd3a59dfb1ecee1135ec92f2ed288
GIT binary patch
literal 252
zc$_7U!A`_53`Bd*SB$ckE{awQsyGyJSiXTDNES<tNSzXUw+e`VC$zVjXM4s6=oAG?
zqR5;o)?7ZNNqD5Fnh3-n(KDoq(u{s3+=LKW9VEl2)mJeG3<$Nw99>H!yj|XL&8$AM
zY=Ccbpn~D4U2dR_k}+X?Ei`APQ}7xhU-MG?<BR#RcPbJy+wsxaN$Wy(JUgqqdUjUm
lD|p$d#BbUb|3h~(G&7N3c6a_Y4_()pm`_V-ulJ#K`~d@fQR@Hz

diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_str.mal 
b/monetdb5/modules/mal/Tests/bincopyfrom_str.mal
new file mode 100644
index 
0000000000000000000000000000000000000000..df2b5b515cb6cf8377d7db5528315802e33242c0
GIT binary patch
literal 8482
zc%1Fpu}%Xq3<luNnHZRutaQ0br;0)c)QSOCRtDDGVQ~>@a>RxzMLZxIFTy);+zmVd
zV&MOzNVb*Oet^tLJ1?XxN-2gy`;;GL2+2|Ks)32)y?2qMi1LoAJ=3!=iRPLlkqdiQ
z#LQ_;O&Quy&zaiJ<wZTTX1cz*r8Y%%KFU}a9{Tt|w&M_zB_GG?a9nRpv$@Q?OUlc}
zY2#y?`cf^n=GDG%7N4TstZLh7>Dg{hYpYYVI<KwpQ{!YRi$Cyqa~K<blJ6N)c5${^
zznP;Mz;B?xUH||900000000000002sf4Vuo`}jP0Ka778X2z#wpMBJ-S!~T04^>_}

diff --git a/monetdb5/modules/mal/remote.mx b/monetdb5/modules/mal/remote.mx
--- a/monetdb5/modules/mal/remote.mx
+++ b/monetdb5/modules/mal/remote.mx
@@ -132,6 +132,20 @@
 address RMTbatload
 comment "create a BAT of the given type and size, and load values from the 
input stream";
 
+pattern batbincopy(b:bat):void
+address RMTbincopyto
+comment "dump BAT b in binary form to the stream";
+pattern batbincopy():bat[:void,:any]
+address RMTbincopyfrom
+comment "store the binary BAT data in the BBP and return as BAT";
+
+pattern bintype():void
+address RMTbintype
+comment "print the binary type of this mserver5";
+
+
+# initialise our localtype
+remote.prelude();
 @{
 @h
 
@@ -183,10 +197,19 @@
 */
 @- Implementation
 @c
+
+#define RMTT_L_ENDIAN   0<<1
+#define RMTT_B_ENDIAN   1<<1
+#define RMTT_32_BITS    0<<2
+#define RMTT_64_BITS    1<<2
+#define RMTT_32_OIDS    0<<3
+#define RMTT_64_OIDS    1<<3
+
 typedef struct _connection {
        MT_Lock            lock;      /* lock to avoid interference */
        str                name;      /* the handle for this connection */
        Mapi               mconn;     /* the Mapi handle for the connection */
+       unsigned char      type;      /* binary profile of the connection 
target */
        size_t             nextid;    /* id counter */
        struct _connection *next;     /* the next connection in the list */
 } *connection;
@@ -196,8 +219,10 @@
 remote_export str RMTepilogue(int *ret);
 @c
 static connection conns = NULL;
+static unsigned char localtype = 0;
 
 static inline str RMTquery(MapiHdl *ret, str func, Mapi conn, str query);
+static inline str RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in);
 
 @h
 remote_export str RMTresolve(int *ret, str *pat);
@@ -282,6 +307,7 @@
        char conn[BUFSIZ];
        char *s;
        Mapi m;
+       MapiHdl hdl;
 
        /* just make sure the return isn't garbage */
        *ret = 0;
@@ -298,6 +324,9 @@
        if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
                throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is 
"
                                "NULL or nil");
+       if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
+               throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenation 
'%s' "
+                               "is not supported", *scen);
 
        m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
        if (mapi_error(m))
@@ -331,6 +360,15 @@
        c->next = conns;
        conns = c;
 
+       RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
+       if (hdl != NULL && mapi_fetch_row(hdl)) {
+               char *val = mapi_fetch_field(hdl, 0);
+               c->type = (unsigned char)atoi(val);
+               mapi_close_handle(hdl);
+       } else {
+               c->type = 0;
+       }
+
        MT_lock_init(&c->lock, "remote connection lock");
 
 #ifdef _DEBUG_MAPI_
@@ -521,6 +559,24 @@
 
 str RMTprelude(int *ret) {
        (void)ret;
+       int type = 0;
+       
+#ifdef WORDS_BIGENDIAN
+       type |= RMTT_B_ENDIAN;
+#else
+       type |= RMTT_L_ENDIAN;
+#endif
+#if SIZEOF_SIZE_T == SIZEOF_LONG_LONG
+       type |= RMTT_64_BITS;
+#else
+       type |= RMTT_32_BITS;
+#endif
+#if SIZEOF_SIZE_T == SIZEOF_INT || defined(MONET_OID32)
+       type |= RMTT_32_OIDS;
+#else
+       type |= RMTT_64_OIDS;
+#endif
+       localtype = (unsigned char)type;
 
        return(MAL_SUCCEED);
 }
@@ -560,7 +616,7 @@
        str conn, ident, tmp, rt;
        connection c;
        char qbuf[BUFSIZ + 1];
-       MapiHdl mhdl;
+       MapiHdl mhdl = NULL;
        int rtype;
        ValPtr v;
 
@@ -593,7 +649,8 @@
                        rt, ident);
        GDKfree(rt);
 
-       if (isaBatType(rtype)) {
+       if (isaBatType(rtype) && (localtype == 0 || localtype != c->type || 
getHeadType(rtype) != TYPE_void))
+       {
                int h, t, s;
                ptr l, r;
                str val, var;
@@ -608,9 +665,6 @@
                /* this call should be a single transaction over the channel*/
                mal_set_lock(c->lock, "remote.get"); 
 
-#ifdef _DEBUG_REMOTE
-               mnstr_printf(cntxt->fdout, "#remote:%s:%s\n", c->name, qbuf);
-#endif
                if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
                                != MAL_SUCCEED)
                {
@@ -656,6 +710,43 @@
                v->val.bval = b->batCacheid;
                v->vtype = TYPE_bat;
                BBPkeepref(b->batCacheid);
+       } else if (isaBatType(rtype)) {
+               /* binary compatible remote host, transfer BAT in binary form */
+               stream *sout;
+               stream *sin;
+               char buf[256];
+               ssize_t sz = 0, rd;
+               str err;
+               BAT *b = NULL;
+
+               /* this call should be a single transaction over the channel*/
+               mal_set_lock(c->lock, "remote.get"); 
+
+               /* bypass Mapi from this point to efficiently write all data to
+                * the server */
+               sout = mapi_get_to(c->mconn);
+               sin = mapi_get_from(c->mconn);
+
+               /* call our remote helper to do this more efficiently */
+               mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
+               mnstr_flush(sout);
+
+               /* read the JSON header */
+               while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] 
!= '\n') {
+                       sz += rd;
+               }
+               if (rd < 0)
+                       throw(MAL, "remote.get", "could not read BAT JSON 
header");
+               if (buf[0] == '!')
+                       return(GDKstrdup(buf));
+
+               buf[sz] = '\0';
+               if ((err = RMTinternalcopyfrom(&b, buf, sin)) != NULL)
+                       return(err);
+
+               v->val.bval = b->batCacheid;
+               v->vtype = TYPE_bat;
+               BBPkeepref(b->batCacheid);
        } else {
                ptr p = NULL;
                str val;
@@ -692,7 +783,8 @@
                }
        }
 
-       mapi_close_handle(mhdl);
+       if (mhdl != NULL)
+               mapi_close_handle(mhdl);
        mal_unset_lock(c->lock, "remote.get"); 
 
        return(MAL_SUCCEED);
@@ -1099,6 +1191,217 @@
        return(MAL_SUCCEED);
 }
 
+@h
+remote_export str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+@c
+/**
+ * dump given BAT to stream
+ */
+str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int bid = *(int *)getArgReference(stk, pci, 1);
+       BAT *b = BBPquickdesc(bid, FALSE);
+
+       (void)mb;
+       (void)stk;
+       (void)pci;
+
+       if (b == NULL)
+               throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
+       if (b->htype != TYPE_void)
+               throw(ILLARG, "remote.bincopyto", "only void-headed BATs are 
supported");
+
+       mnstr_printf(cntxt->fdout, /*JSON*/"{"
+                       "\"version\":1,"
+                       "\"htype\":%d,"
+                       "\"ttype\":%d,"
+                       "\"seqbase\":" OIDFMT ","
+                       "\"size\":" SZFMT ","
+                       "\"tailsize\":" SZFMT ","
+                       "\"theapsize\":" SZFMT
+                       "}\n",
+                       TYPE_void,
+                       b->ttype,
+                       b->hseqbase == oid_nil ? 0 : b->hseqbase,
+                       BATcount(b),
+                       BATcount(b) * b->T->width,
+                       b->T->varsized ? b->T->vheap->free : 0
+                       );
+       mnstr_write(cntxt->fdout, /* tail */
+                       b->T->heap.base + (b->U->first * b->T->width),
+                       BATcount(b) * b->T->width, 1);
+       if (b->T->varsized)
+               mnstr_write(cntxt->fdout, /* theap */
+                               b->T->vheap->base,
+                               b->T->vheap->free, 1);
+       mnstr_flush(cntxt->fdout);
+
+       return(MAL_SUCCEED);
+}
+
+@h
+remote_export str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+@c
+typedef struct _binbat_v1 {
+       int Htype;
+       int Ttype;
+       oid seqbase;
+       size_t size;
+       size_t tailsize;
+       size_t theapsize;
+} binbat;
+
+static inline str
+RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in)
+{
+       binbat bb = { 0, 0, 0, 0, 0, 0 };
+       char *nme = NULL;
+       char *val = NULL;
+
+       BAT *b;
+
+       /* hdr is a JSON structure that looks like
+        * 
{"version":1,"htype":0,"ttype":6,"seqbase":0,"tailsize":4,"theapsize":0}
+        * we take the binary data directly from the stream */
+
+       /* could skip whitespace, but we just don't allow that */
+       if (*hdr++ != '{')
+               throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON 
header");
+       while (*hdr != '\0') {
+               switch (*hdr) {
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to