Changeset: a1679b57a54a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a1679b57a54a Modified Files: monetdb5/optimizer/opt_iot.c sql/backends/monet5/iot/Tests/webtest.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/basket.mal sql/backends/monet5/iot/petrinet.c Branch: iot Log Message:
React to new baskets and cleanup afterwards. diffs (165 lines): diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c --- a/monetdb5/optimizer/opt_iot.c +++ b/monetdb5/optimizer/opt_iot.c @@ -164,6 +164,13 @@ OPTiotImplementation(Client cntxt, MalBl if( getModuleId(p)== iotRef && getFunctionId(p)==errorRef) noerror++; if (p->token == ENDsymbol && btop > 0 && noerror==0) { + // empty all baskets used + for( j=0; j<btop; j++) + if( done[j]==0) { + p= newStmt(mb,basketRef,finishRef); + p= pushStr(mb,p, schemas[j]); + p= pushStr(mb,p, tables[j]); + } /* catch any exception left behind */ r = newAssignment(mb); j = getArg(r, 0) = newVariable(mb, GDKstrdup("SQLexception"), TYPE_str); diff --git a/sql/backends/monet5/iot/Tests/webtest.sql b/sql/backends/monet5/iot/Tests/webtest.sql --- a/sql/backends/monet5/iot/Tests/webtest.sql +++ b/sql/backends/monet5/iot/Tests/webtest.sql @@ -1,11 +1,19 @@ set schema iot; create stream table temps( iotclk timestamp, room string , temperature real); +create table atemps( iotclk timestamp, cnt int , temperature real); -- remainder depends on location of the baskets root declare baskets string; set baskets= '/ufs/mk/baskets/measures/temperatures/'; call iot.basket('iot','temps', concat(baskets,'1')); -select * from temps; -call iot.basket('iot','temps', concat(baskets,'1')); -select * from temps; +create procedure web00() +begin + insert into atemps select min(iotclk), count(*), avg(temperature) from temps; +end; + +call iot.query('iot','web00'); + +select * from iot.baskets(); +select * from iot.queries(); + diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -95,6 +95,7 @@ static str BSKTnewbasket(sql_schema *s, sql_table *t) { int i, idx; + int colcnt=0; node *o; // Don't introduce the same basket twice @@ -117,10 +118,10 @@ BSKTnewbasket(sql_schema *s, sql_table * MT_lock_unset(&iotLock); throw(MAL,"baskets.register","Unsupported type %d",tpe); } - baskets[idx].count++; + colcnt++; } // collect the column names - baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (baskets[idx].count+1)); + baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (colcnt+1)); for (i=0, o = t->columns.set->h; o; o = o->next){ sql_column *col = o->data; baskets[idx].cols[i++]= col->base.name; @@ -510,6 +511,8 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m assert( access (buf,R_OK) == 0); //unlink(buf); } + baskets[bskt].status = BSKTAVAILABLE; + baskets[bskt].count = cnt; recover: /* reset all BATs when they are misaligned or error occurred */ @@ -526,6 +529,42 @@ recover: (void) mb; return msg; } + +/* remove tuples from a basket according to the sliding policy */ +str +BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str sch = *getArgReference_str(stk, pci, 1); + str tbl = *getArgReference_str(stk, pci, 2); + BAT *b; + node *n; + mvc *m = NULL; + str msg; + int bskt; + + (void) mb; + (void) stk; + (void) pci; + + msg= getSQLContext(cntxt,NULL, &m, NULL); + bskt = BSKTlocate(sch,tbl); + if (bskt == 0) + throw(SQL, "iot.finish", "Could not find the basket %s.%s",sch,tbl); + + if( msg ==MAL_SUCCEED) + /* reset all stream BATs to empty*/ + for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ + sql_column *c = n->data; + b = store_funcs.bind_col(m->session->tr,c,RDONLY); + assert( b ); + // use the proper basket policy + BATsetcount(b,0); + BBPunfix(b->batCacheid); + } + baskets[bskt].count= 0; + return msg; +} + str BSKTdump(void *ret) { diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h --- a/sql/backends/monet5/iot/basket.h +++ b/sql/backends/monet5/iot/basket.h @@ -84,7 +84,7 @@ iot_export str BSKTwindow(Client cntxt, iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTtableerrors(bat *nmeId, bat *errorId); -iot_export str BSKTerror(void *ret, str *sch, str *fcn, str *msg); +iot_export str BSKTfinish( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); //iot_export str BSKTnewbasket(sql_schema *s, sql_table *t); iot_export str BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); diff --git a/sql/backends/monet5/iot/basket.mal b/sql/backends/monet5/iot/basket.mal --- a/sql/backends/monet5/iot/basket.mal +++ b/sql/backends/monet5/iot/basket.mal @@ -81,6 +81,10 @@ command reset():void address BSKTreset comment "Remove all baskets"; +pattern finish(sch:str, tbl:str):void +address BSKTfinish +comment "Empty the basket using the prevaling policy"; + pattern iot.basket(sch:str, tbl:str, dir:str):void address BSKTpushBasket comment "Push a directory with the binary files"; diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -335,6 +335,7 @@ PNexecute( void *n) _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s transition done:%s\n",node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:"")); MT_lock_set(&iotLock); + // empty the baskets according to their policy for ( i=0; i< j && node->enabled && node->places[i]; i++) { idx = node->places[i]; baskets[idx].status = BSKTAVAILABLE; @@ -398,7 +399,7 @@ PNscheduler(void *dummy) } } else /* consider baskets that are properly filled */ - if (baskets[idx].threshold > baskets[idx].count){ + if (baskets[idx].threshold > baskets[idx].count || baskets[idx].count == 0){ pnet[i].enabled = 0; break; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list