Changeset: 7b6aebfeecfd for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7b6aebfeecfd Modified Files: sql/backends/monet5/datacell/basket.mx Branch: default Log Message:
Extend with sliding time windows Still to be tested diffs (truncated from 385 to 300 lines): diff --git a/sql/backends/monet5/datacell/basket.mx b/sql/backends/monet5/datacell/basket.mx --- a/sql/backends/monet5/datacell/basket.mx +++ b/sql/backends/monet5/datacell/basket.mx @@ -64,6 +64,10 @@ address BSKTwindow comment "Use a window of N event and slide S afterwards"; +command timewindow{unsafe}(tbl:str, N:int, S:int):bit +address BSKTtimewindow +comment "Use a window of N milliseconds and slide S milliseconds afterwards"; + command beat(tbl:str,N:int):bit address BSKTbeat comment "Set an delay to N milliseconds"; @@ -76,6 +80,10 @@ address BSKTtable comment "Inspect the datacell baskets"; +command errors():bat[:str,:bat] +address BSKTtableerrors +comment "Return the table with all errors"; + command dump() address BSKTdump comment "Dump the status of the basket table"; @@ -108,15 +116,19 @@ MT_Lock lock; str name; /* table that represents the basket */ int threshold ; /* bound to determine scheduling eligibility */ - int winsize, winslide; /* sliding window operations */ + int winsize, winstride; /* sliding window operations */ + lng timeslice, timestride; /* temporal sliding window, determined by first temporal component */ lng beat; /* milliseconds delay */ int colcount; + int port; /* port claimed */ str *cols; BAT **primary; /* statistics */ timestamp seen; int events; /* total number of events grabbed */ int grabs; /* number of grabs */ + /* collected errors */ + BAT *errors; } *BSKTbasket, BSKTbasketRec; datacell_export str schema_default; @@ -132,7 +144,9 @@ datacell_export str BSKTthreshold(int *ret, str *tbl, int *sz); datacell_export str BSKTbeat(int *ret, str *tbl, int *sz); datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide); +datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide); datacell_export str BSKTtable(int *ret); +datacell_export str BSKTtableerrors(int *ret); datacell_export str BSKTlock(int *ret, str *tbl, int *delay); datacell_export str BSKTunlock(int *ret, str *tbl); @@ -260,6 +274,8 @@ for (o = t->columns.set->h; o; o = o->next) baskets[idx].colcount++; baskets[idx].cols = GDKzalloc((baskets[idx].colcount + 1) * sizeof(str)); baskets[idx].primary = GDKzalloc((baskets[idx].colcount + 1) * sizeof(BAT *)); + baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY); + (void) MTIMEcurrent_timestamp(&baskets[idx].seen); i = 0; for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) { @@ -396,12 +412,14 @@ for ( bskt = 0; bskt < bsktLimit; bskt++) if ( baskets[bskt].name){ - mnstr_printf(GDKout, "#baskets[%2d] %s columns %d threshold %d window=[%d,%d] beat %d events " BUNFMT "\n", bskt, + mnstr_printf(GDKout, "#baskets[%2d] %s columns %d threshold %d window=[%d,%d] time window=[%d,%d] beat %d milliseconds events " BUNFMT "\n", bskt, baskets[bskt].name, baskets[bskt].colcount, baskets[bskt].threshold, baskets[bskt].winsize, - baskets[bskt].winslide, + baskets[bskt].winstride, + baskets[bskt].timeslice, + baskets[bskt].timestride, baskets[bskt].beat, (baskets[bskt].primary[0]? BATcount(baskets[bskt].primary[0]): 0)); } @@ -413,9 +431,11 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str tbl; - int bskt, i, *ret; - BAT *b,*bn = 0,*v; + int bskt, i, k, *ret; + BAT *b,*bn = 0, *bo = 0, *bs, *v; int cnt = 0; + timestamp start, finish; + char sbuf[BUFSIZ], *sptr= sbuf, fbuf[BUFSIZ], *fptr= fbuf; (void) cntxt; (void) mb; @@ -427,36 +447,92 @@ if ( baskets[bskt].colcount != pci->retc) throw(MAL,"basket.grab","Incompatible arguments"); - mal_set_lock(baskets[bskt].lock, "lock basket"); - for ( i=0; i < baskets[bskt].colcount; i++) { - ret= (int*) getArgReference(stk,pci,i); - b = baskets[bskt].primary[i]; + if ( baskets[bskt].timeslice){ + /* perform time slicing */ + mal_set_lock(baskets[bskt].lock, "lock basket"); + /* search the first timestamp colum */ + for( k = 0; k< baskets[bskt].colcount; k++) + if ( baskets[bskt].primary[k]->ttype == TYPE_timestamp ) + break; + if ( k == baskets[bskt].colcount) + throw(MAL,"basket.grab","Timestamp column missing"); + + /* collect all tuples that satisfy seen < t < seen+winsize */ + start = baskets[bskt].seen; + MTIMEtimestamp_add(&finish, &start, &baskets[bskt].timeslice); + i = BUFSIZ; + timestamp_tostr(&sptr, &i, &start); + timestamp_tostr(&fptr, &i, &finish); + mnstr_printf(GDKout,"#range %s - %s\n",sbuf,fbuf); + + bo = BATuselect(baskets[bskt].primary[k], &start, &finish); + baskets[bskt].seen = finish; + + /* remove all those before cutoff time from basket */ + MTIMEtimestamp_add(&start, &start, &baskets[bskt].timestride); + finish = *timestamp_nil; + bs = BATselect(baskets[bskt].primary[k], &start, &finish); + + for ( i=0; i < baskets[bskt].colcount; i++) { + ret= (int*) getArgReference(stk,pci,i); + b = baskets[bskt].primary[i]; + if ( BATcount(bo) == 0) + bn = BATnew(b->htype, b->ttype, BATTINY); + else + bn = BATjoin( BATmirror(bo), b, BUN_NONE); + *ret = bn->batCacheid; + BBPkeepref(*ret); + + /* clean out basket */ + bn = BATjoin( BATmirror(bs), b, BUN_NONE); + b = BATsetaccess(b, BAT_WRITE); + BATclear(b); + BATins(b,bn, FALSE); + BBPreleaseref(bn->batCacheid); + } + BBPreleaseref(bo->batCacheid); + BBPreleaseref(bs->batCacheid); + + mal_unset_lock(baskets[bskt].lock, "unlock basket"); + } else + if ( baskets[bskt].winsize ) { /* take care of sliding windows */ - if ( baskets[bskt].winsize ){ - /* we may be too early */ - if ( BATcount(b) < (BUN) baskets[bskt].winsize) - break; + mal_set_lock(baskets[bskt].lock, "lock basket"); + for ( i=0; i < baskets[bskt].colcount; i++) { + ret= (int*) getArgReference(stk,pci,i); + b = baskets[bskt].primary[i]; + + /* we may be too early, all BATs are aligned */ + if ( BATcount(b) < (BUN) baskets[bskt].winsize) { + mal_unset_lock(baskets[bskt].lock, "unlock basket"); + throw(MAL,"basket.grab","too early"); + } + bn = BATcopy(b, b->htype, b->ttype,TRUE); - v = BATslice(bn, baskets[bskt].winslide,BATcount(bn)); + v = BATslice(bn, baskets[bskt].winstride,BATcount(bn)); + b = BATsetaccess(b, BAT_WRITE); BATclear(b); - b = BATsetaccess(b, BAT_WRITE); BATins(b,v, FALSE); BATsetcount(bn, baskets[bskt].winsize); cnt = (int) BATcount(bn); BBPunfix(v->batCacheid); - } else { + *ret = bn->batCacheid; + BBPkeepref(*ret); + } + mal_unset_lock(baskets[bskt].lock, "unlock basket"); + } else { + /* straight copy of the basket */ + mal_set_lock(baskets[bskt].lock, "lock basket"); + for ( i=0; i < baskets[bskt].colcount; i++) { + ret= (int*) getArgReference(stk,pci,i); + b = baskets[bskt].primary[i]; bn = BATcopy(b, b->htype, b->ttype,TRUE); BATclear(b); + *ret = bn->batCacheid; + BBPkeepref(*ret); } - - *ret = bn->batCacheid; - BBPkeepref(*ret); - } - mal_unset_lock(baskets[bskt].lock, "unlock basket"); - if ( i != baskets[bskt].colcount) { - if( bn) BBPunfix( bn->batCacheid); - throw(MAL,"basket.grab","too early"); + mal_unset_lock(baskets[bskt].lock, "unlock basket"); } baskets[bskt].grabs++; baskets[bskt].events += cnt; @@ -557,23 +633,47 @@ } str -BSKTwindow(int *ret, str *tbl, int *sz, int *slide) +BSKTwindow(int *ret, str *tbl, int *sz, int *stride) { int idx; idx = BSKTlocate(*tbl); if (idx == 0 ) throw(MAL,"basket.window","Basket not found"); - if ( *slide < 0 || *slide > *sz) - throw(MAL,"basket.window","Illegal window slide"); + if ( *stride < 0 || *stride > *sz) + throw(MAL,"basket.window","Illegal window stride"); if ( *sz < 0 ) throw(MAL,"basket.window","Illegal window size"); + if ( baskets[idx].timeslice ) + throw(MAL,"basket.window","Ambiguous sliding window, temporal window size already set"); /* administer the required size */ baskets[idx].winsize = *sz; if ( baskets[idx].threshold < *sz) baskets[idx].threshold = *sz; - baskets[idx].winslide = *slide; + baskets[idx].winstride = *stride; + *ret = TRUE; + return MAL_SUCCEED; +} + +str +BSKTtimewindow(int *ret, str *tbl, int *sz, int *stride) +{ + int idx; + + idx = BSKTlocate(*tbl); + if (idx == 0 ) + throw(MAL,"basket.window","Basket not found"); + if ( *stride < 0 || *stride > *sz) + throw(MAL,"basket.window","Illegal window stride"); + if ( *sz < 0 ) + throw(MAL,"basket.window","Illegal window size"); + if ( baskets[idx].winsize ) + throw(MAL,"basket.window","Ambiguous time window, window size already set"); + + /* administer the required time window size */ + baskets[idx].timeslice = *sz; + baskets[idx].timestride = *stride; *ret = TRUE; return MAL_SUCCEED; } @@ -603,7 +703,8 @@ BSKTtable(int *ret) { BAT *bn, *name, *seen, *events, *grabs; - BAT *threshold, *winsize, *winslide, *beat; + BAT *threshold, *winsize, *winstride, *beat; + BAT *timeslice, *timestride; int i; bn = BATnew(TYPE_str, TYPE_bat, BATTINY); @@ -619,9 +720,9 @@ winsize = BATnew(TYPE_oid,TYPE_int, BATTINY); if ( winsize == 0 ) goto wrapup; BATseqbase(winsize,0); - winslide = BATnew(TYPE_oid,TYPE_int, BATTINY); - if ( winslide == 0 ) goto wrapup; - BATseqbase(winslide,0); + winstride = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( winstride == 0 ) goto wrapup; + BATseqbase(winstride,0); beat = BATnew(TYPE_oid,TYPE_int, BATTINY); if ( beat == 0 ) goto wrapup; BATseqbase(beat,0); @@ -635,21 +736,32 @@ if ( events == 0 ) goto wrapup; BATseqbase(events,0); + timeslice = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( timeslice == 0 ) goto wrapup; + BATseqbase(timeslice,0); + timestride = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( timestride == 0 ) goto wrapup; + BATseqbase(timestride,0); + for ( i =1; i < bsktTop; i++) if ( baskets[i].name ) { BUNappend(name, baskets[i].name, FALSE); BUNappend(threshold, &baskets[i].threshold, FALSE); BUNappend(winsize, &baskets[i].winsize, FALSE); - BUNappend(winslide, &baskets[i].winslide, FALSE); _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list