Changeset: ed10194a1320 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ed10194a1320 Modified Files: sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/petrinet.c Branch: iot Log Message:
Add window strides First step towards slowly moving windows based on row count and time diffs (129 lines): diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -26,7 +26,7 @@ */ #include "monetdb_config.h" -#include <gdk.h> +#include "gdk.h" #include "iot.h" #include "basket.h" #include "mal_exception.h" @@ -36,7 +36,7 @@ //#define _DEBUG_BASKET_ if(0) #define _DEBUG_BASKET_ -str statusname[4] = { "<unknown>", "running", "paused", "locked" }; +str statusname[4] = { "<unknown>", "active", "paused", "locked" }; BasketRec *baskets; /* the global iot catalog */ static int bsktTop = 0, bsktLimit = 0; @@ -531,12 +531,21 @@ recover: } /* remove tuples from a basket according to the sliding policy */ +#define ColumnShift(B,TPE, STRIDE) { \ + TPE *first= (TPE*) Tloc(B, BUNfirst(B));\ + TPE *n = first+STRIDE;\ + TPE *last= (TPE*) Tloc(B, BUNlast(B));\ + for( ; n < last; n++, first++)\ + *first=*n;\ +} + str BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str sch = *getArgReference_str(stk, pci, 1); str tbl = *getArgReference_str(stk, pci, 2); BAT *b; + BUN cnt, stride; node *n; mvc *m = NULL; str msg; @@ -547,12 +556,71 @@ BSKTfinish(Client cntxt, MalBlkPtr mb, M (void) pci; msg= getSQLContext(cntxt,NULL, &m, NULL); + if( msg ) + throw(SQL,"iot.finish","Missing SQL context"); bskt = BSKTlocate(sch,tbl); if (bskt == 0) throw(SQL, "iot.finish", "Could not find the basket %s.%s",sch,tbl); - if( msg ==MAL_SUCCEED) - /* reset all stream BATs to empty*/ + /* window stride option */ + if( baskets[bskt].winsize && (stride =baskets[bskt].winstride)){ +shiftcolumns: + for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ + sql_column *c = n->data; + b = store_funcs.bind_col(m->session->tr,c,RDONLY); + assert( b ); + cnt=BATcount(b); + if( cnt < stride) + break; + + switch(ATOMstorage(b->ttype)){ + case TYPE_bit:ColumnShift(b,bit,stride); break; + case TYPE_bte:ColumnShift(b,bte,stride); break; + case TYPE_sht:ColumnShift(b,sht,stride); break; + case TYPE_int:ColumnShift(b,int,stride); break; + case TYPE_oid:ColumnShift(b,oid,stride); break; + case TYPE_flt:ColumnShift(b,flt,stride); break; + case TYPE_dbl:ColumnShift(b,dbl,stride); break; + case TYPE_lng:ColumnShift(b,lng,stride); break; +#ifdef HAVE_HGE + case TYPE_hge:ColumnShift(b,hge,stride); break; +#endif + case TYPE_str: + switch(b->T->width){ + case 1: ColumnShift(b,bte,stride); break; + case 2: ColumnShift(b,sht,stride); break; + case 4: ColumnShift(b,int,stride); break; + case 8: ColumnShift(b,lng,stride); break; + } + break; + default: break; + } + BATsetcount(b, BATcount(b)-stride); + BBPunfix(b->batCacheid); + } + return MAL_SUCCEED; + } + + /* time stride option, prepare a new stride based on the leading 'iotclk' */ + if( baskets[bskt].timeslice && baskets[bskt].timestride){ + sql_column *c; + lng *first, *last, stop; + n = baskets[bskt].table->columns.set->h; + c = n->data; + b = store_funcs.bind_col(m->session->tr,c,RDONLY); + assert( b ); + if( b->ttype !=TYPE_timestamp) + throw(SQL, "iot.finish", "Could not find the leading 'iotclk' in %s.%s",sch,tbl); + first= (lng*) Tloc(b, BUNfirst(b)); + last = (lng*) Tloc(b, BUNlast(b)); + stride =0; + stop = *first + baskets[bskt].timestride; + for( ; first < last; first++) + if (*first >stop) break; + goto shiftcolumns; + } + + /* default action: reset all stream BATs to empty*/ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ sql_column *c = n->data; b = store_funcs.bind_col(m->session->tr,c,RDONLY); diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -48,7 +48,7 @@ #define MAXPN 200 /* it is the minimum, if we need more space GDKrealloc */ -static str statusname[6] = { "<unknown>", "running", "paused"}; +static str statusname[6] = { "<unknown>", "active", "paused"}; static void PNstartScheduler(void); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list