Changeset: 7b6aebfeecfd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7b6aebfeecfd
Modified Files:
        sql/backends/monet5/datacell/basket.mx
Branch: default
Log Message:

Extend with sliding time windows
Still to be tested


diffs (truncated from 385 to 300 lines):

diff --git a/sql/backends/monet5/datacell/basket.mx 
b/sql/backends/monet5/datacell/basket.mx
--- a/sql/backends/monet5/datacell/basket.mx
+++ b/sql/backends/monet5/datacell/basket.mx
@@ -64,6 +64,10 @@
 address BSKTwindow
 comment "Use a window of N event and slide S afterwards";
 
+command timewindow{unsafe}(tbl:str, N:int, S:int):bit
+address BSKTtimewindow
+comment "Use a window of N milliseconds and slide S milliseconds afterwards";
+
 command beat(tbl:str,N:int):bit
 address BSKTbeat
 comment "Set an delay to N milliseconds";
@@ -76,6 +80,10 @@
 address BSKTtable
 comment "Inspect the datacell baskets";
 
+command errors():bat[:str,:bat]
+address BSKTtableerrors
+comment "Return the table with all errors";
+
 command dump()
 address BSKTdump
 comment "Dump the status of the basket table";
@@ -108,15 +116,19 @@
        MT_Lock lock;
        str name;       /* table that represents the basket */
        int threshold ; /* bound to determine scheduling eligibility */
-       int winsize, winslide; /* sliding window operations */
+       int winsize, winstride; /* sliding window operations */
+       lng timeslice, timestride; /* temporal sliding window, determined by 
first temporal component */
        lng beat;       /* milliseconds delay */
        int colcount;
+       int port;       /* port claimed */
        str *cols;
        BAT **primary;  
        /* statistics */
        timestamp seen;
        int events; /* total number of events grabbed */
        int grabs; /* number of grabs */
+       /* collected errors */
+       BAT *errors;
 } *BSKTbasket, BSKTbasketRec;
 
 datacell_export str schema_default;
@@ -132,7 +144,9 @@
 datacell_export str BSKTthreshold(int *ret, str *tbl, int *sz);
 datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
 datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
+datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide);
 datacell_export str BSKTtable(int *ret);
+datacell_export str BSKTtableerrors(int *ret);
 
 datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
 datacell_export str BSKTunlock(int *ret, str *tbl);
@@ -260,6 +274,8 @@
        for (o = t->columns.set->h; o; o = o->next) baskets[idx].colcount++;
        baskets[idx].cols = GDKzalloc((baskets[idx].colcount + 1) * 
sizeof(str));
        baskets[idx].primary = GDKzalloc((baskets[idx].colcount + 1) * 
sizeof(BAT *));
+       baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY);
+       (void) MTIMEcurrent_timestamp(&baskets[idx].seen);
 
        i = 0;
        for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) {
@@ -396,12 +412,14 @@
 
        for ( bskt = 0; bskt < bsktLimit; bskt++) 
        if ( baskets[bskt].name){
-               mnstr_printf(GDKout, "#baskets[%2d] %s columns %d threshold %d 
window=[%d,%d] beat %d events " BUNFMT "\n", bskt,
+               mnstr_printf(GDKout, "#baskets[%2d] %s columns %d threshold %d 
window=[%d,%d] time window=[%d,%d] beat %d milliseconds events " BUNFMT "\n", 
bskt,
                        baskets[bskt].name,
                        baskets[bskt].colcount,
                        baskets[bskt].threshold,
                        baskets[bskt].winsize,
-                       baskets[bskt].winslide,
+                       baskets[bskt].winstride,
+                       baskets[bskt].timeslice,
+                       baskets[bskt].timestride,
                        baskets[bskt].beat,
                        (baskets[bskt].primary[0]? 
BATcount(baskets[bskt].primary[0]): 0));
        }
@@ -413,9 +431,11 @@
 BSKTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str tbl;
-       int bskt, i, *ret;
-       BAT *b,*bn = 0,*v;
+       int bskt, i, k, *ret;
+       BAT *b,*bn = 0, *bo = 0, *bs, *v;
        int cnt = 0;
+       timestamp start, finish;
+       char sbuf[BUFSIZ], *sptr= sbuf, fbuf[BUFSIZ], *fptr= fbuf;
 
        (void) cntxt;
        (void) mb;
@@ -427,36 +447,92 @@
        if ( baskets[bskt].colcount != pci->retc)
                throw(MAL,"basket.grab","Incompatible arguments");
 
-       mal_set_lock(baskets[bskt].lock, "lock basket");
-       for ( i=0; i < baskets[bskt].colcount; i++) {
-               ret= (int*) getArgReference(stk,pci,i);
-               b = baskets[bskt].primary[i];
+       if ( baskets[bskt].timeslice){
+               /* perform time slicing */
+               mal_set_lock(baskets[bskt].lock, "lock basket");
 
+               /* search the first timestamp colum */
+               for( k = 0; k< baskets[bskt].colcount; k++)
+               if ( baskets[bskt].primary[k]->ttype == TYPE_timestamp )
+                       break;
+               if ( k == baskets[bskt].colcount)
+                       throw(MAL,"basket.grab","Timestamp column missing");
+
+               /* collect all tuples that satisfy seen < t < seen+winsize */
+               start = baskets[bskt].seen;
+               MTIMEtimestamp_add(&finish, &start, &baskets[bskt].timeslice);
+               i = BUFSIZ;
+               timestamp_tostr(&sptr, &i, &start);
+               timestamp_tostr(&fptr, &i, &finish);
+               mnstr_printf(GDKout,"#range %s - %s\n",sbuf,fbuf);
+
+               bo = BATuselect(baskets[bskt].primary[k], &start, &finish);
+               baskets[bskt].seen = finish;
+
+               /* remove all those before cutoff time from basket */
+               MTIMEtimestamp_add(&start, &start, &baskets[bskt].timestride);
+               finish = *timestamp_nil;
+               bs = BATselect(baskets[bskt].primary[k], &start, &finish);
+
+               for ( i=0; i < baskets[bskt].colcount; i++) {
+                       ret= (int*) getArgReference(stk,pci,i);
+                       b = baskets[bskt].primary[i];
+                       if ( BATcount(bo) == 0)
+                               bn = BATnew(b->htype, b->ttype, BATTINY);
+                       else 
+                               bn = BATjoin( BATmirror(bo), b, BUN_NONE);
+                       *ret = bn->batCacheid;
+                       BBPkeepref(*ret);
+
+                       /* clean out basket */
+                       bn = BATjoin( BATmirror(bs), b, BUN_NONE);
+                       b = BATsetaccess(b, BAT_WRITE);
+                       BATclear(b);
+                       BATins(b,bn, FALSE);
+                       BBPreleaseref(bn->batCacheid);
+               }
+               BBPreleaseref(bo->batCacheid);
+               BBPreleaseref(bs->batCacheid);
+
+               mal_unset_lock(baskets[bskt].lock, "unlock basket");
+       } else 
+       if ( baskets[bskt].winsize ) {
                /* take care of sliding windows */
-               if ( baskets[bskt].winsize ){
-                       /* we may be too early */
-                       if ( BATcount(b) < (BUN) baskets[bskt].winsize)
-                               break;
+               mal_set_lock(baskets[bskt].lock, "lock basket");
+               for ( i=0; i < baskets[bskt].colcount; i++) {
+                       ret= (int*) getArgReference(stk,pci,i);
+                       b = baskets[bskt].primary[i];
+
+                       /* we may be too early, all BATs are aligned */
+                       if ( BATcount(b) < (BUN) baskets[bskt].winsize) {
+                               mal_unset_lock(baskets[bskt].lock, "unlock 
basket");
+                               throw(MAL,"basket.grab","too early");
+                       }
+                               
                        bn = BATcopy(b, b->htype, b->ttype,TRUE);
-                       v = BATslice(bn, baskets[bskt].winslide,BATcount(bn));
+                       v = BATslice(bn, baskets[bskt].winstride,BATcount(bn));
+                       b = BATsetaccess(b, BAT_WRITE);
                        BATclear(b);
-                       b = BATsetaccess(b, BAT_WRITE);
                        BATins(b,v, FALSE);
                        BATsetcount(bn, baskets[bskt].winsize);
                        cnt = (int) BATcount(bn);
                        BBPunfix(v->batCacheid);
-               } else {
+                       *ret = bn->batCacheid;
+                       BBPkeepref(*ret);
+               }
+               mal_unset_lock(baskets[bskt].lock, "unlock basket");
+       } else {
+               /* straight copy of the basket */
+               mal_set_lock(baskets[bskt].lock, "lock basket");
+               for ( i=0; i < baskets[bskt].colcount; i++) {
+                       ret= (int*) getArgReference(stk,pci,i);
+                       b = baskets[bskt].primary[i];
                        bn = BATcopy(b, b->htype, b->ttype,TRUE);
                        BATclear(b);
+                       *ret = bn->batCacheid;
+                       BBPkeepref(*ret);
                }
-
-               *ret = bn->batCacheid;
-               BBPkeepref(*ret);
-       }
-       mal_unset_lock(baskets[bskt].lock, "unlock basket");
-       if ( i != baskets[bskt].colcount) {
-               if( bn) BBPunfix( bn->batCacheid);
-               throw(MAL,"basket.grab","too early");
+               mal_unset_lock(baskets[bskt].lock, "unlock basket");
        }
        baskets[bskt].grabs++;
        baskets[bskt].events += cnt;
@@ -557,23 +633,47 @@
 }
 
 str
-BSKTwindow(int *ret, str *tbl, int *sz, int *slide)
+BSKTwindow(int *ret, str *tbl, int *sz, int *stride)
 {
        int idx;
 
        idx = BSKTlocate(*tbl);
        if (idx == 0 )
                throw(MAL,"basket.window","Basket not found");
-       if ( *slide < 0 || *slide > *sz)
-               throw(MAL,"basket.window","Illegal window slide");
+       if ( *stride < 0 || *stride > *sz)
+               throw(MAL,"basket.window","Illegal window stride");
        if ( *sz < 0 )
                throw(MAL,"basket.window","Illegal window size");
+       if ( baskets[idx].timeslice )
+               throw(MAL,"basket.window","Ambiguous sliding window, temporal 
window size already set");
 
        /* administer the required size */
        baskets[idx].winsize = *sz;
        if ( baskets[idx].threshold < *sz)
                baskets[idx].threshold = *sz;
-       baskets[idx].winslide = *slide;
+       baskets[idx].winstride = *stride;
+       *ret = TRUE;
+       return MAL_SUCCEED;
+}
+
+str
+BSKTtimewindow(int *ret, str *tbl, int *sz, int *stride)
+{
+       int idx;
+
+       idx = BSKTlocate(*tbl);
+       if (idx == 0 )
+               throw(MAL,"basket.window","Basket not found");
+       if ( *stride < 0 || *stride > *sz)
+               throw(MAL,"basket.window","Illegal window stride");
+       if ( *sz < 0 )
+               throw(MAL,"basket.window","Illegal window size");
+       if ( baskets[idx].winsize )
+               throw(MAL,"basket.window","Ambiguous time window, window size 
already set");
+
+       /* administer the required time window size */
+       baskets[idx].timeslice = *sz;
+       baskets[idx].timestride = *stride;
        *ret = TRUE;
        return MAL_SUCCEED;
 }
@@ -603,7 +703,8 @@
 BSKTtable(int *ret)
 {
        BAT *bn, *name, *seen, *events, *grabs;
-       BAT *threshold, *winsize, *winslide, *beat;
+       BAT *threshold, *winsize, *winstride, *beat;
+       BAT *timeslice, *timestride;
        int i;
 
        bn = BATnew(TYPE_str, TYPE_bat, BATTINY);
@@ -619,9 +720,9 @@
        winsize = BATnew(TYPE_oid,TYPE_int, BATTINY);
        if ( winsize == 0 ) goto wrapup;
        BATseqbase(winsize,0);
-       winslide = BATnew(TYPE_oid,TYPE_int, BATTINY);
-       if ( winslide == 0 ) goto wrapup;
-       BATseqbase(winslide,0);
+       winstride = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( winstride == 0 ) goto wrapup;
+       BATseqbase(winstride,0);
        beat = BATnew(TYPE_oid,TYPE_int, BATTINY);
        if ( beat == 0 ) goto wrapup;
        BATseqbase(beat,0);
@@ -635,21 +736,32 @@
        if ( events == 0 ) goto wrapup;
        BATseqbase(events,0);
 
+       timeslice = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( timeslice == 0 ) goto wrapup;
+       BATseqbase(timeslice,0);
+       timestride = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( timestride == 0 ) goto wrapup;
+       BATseqbase(timestride,0);
+
        for ( i =1; i < bsktTop; i++) 
        if ( baskets[i].name ) {
                BUNappend(name, baskets[i].name, FALSE);
                BUNappend(threshold, &baskets[i].threshold, FALSE);
                BUNappend(winsize, &baskets[i].winsize, FALSE);
-               BUNappend(winslide, &baskets[i].winslide, FALSE);
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to