Changeset: a1679b57a54a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a1679b57a54a
Modified Files:
        monetdb5/optimizer/opt_iot.c
        sql/backends/monet5/iot/Tests/webtest.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

React to new baskets and cleanup afterwards.


diffs (165 lines):

diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -164,6 +164,13 @@ OPTiotImplementation(Client cntxt, MalBl
                        if( getModuleId(p)== iotRef && 
getFunctionId(p)==errorRef)
                                noerror++;
                        if (p->token == ENDsymbol && btop > 0 && noerror==0) {
+                               // empty all baskets used
+                               for( j=0; j<btop; j++)
+                               if( done[j]==0) {
+                                       p= newStmt(mb,basketRef,finishRef);
+                                       p= pushStr(mb,p, schemas[j]);
+                                       p= pushStr(mb,p, tables[j]);
+                               }
                                /* catch any exception left behind */
                                r = newAssignment(mb);
                                j = getArg(r, 0) = newVariable(mb, 
GDKstrdup("SQLexception"), TYPE_str);
diff --git a/sql/backends/monet5/iot/Tests/webtest.sql 
b/sql/backends/monet5/iot/Tests/webtest.sql
--- a/sql/backends/monet5/iot/Tests/webtest.sql
+++ b/sql/backends/monet5/iot/Tests/webtest.sql
@@ -1,11 +1,19 @@
 set schema iot;
 create stream table temps( iotclk timestamp, room string , temperature real);
+create table atemps( iotclk timestamp, cnt int , temperature real);
 -- remainder depends on location of the baskets root
 
 declare baskets string;
 set baskets= '/ufs/mk/baskets/measures/temperatures/';
 
 call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
-call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
+create procedure web00()
+begin
+    insert into atemps select min(iotclk), count(*), avg(temperature) from 
temps;
+end;
+
+call iot.query('iot','web00');
+
+select * from  iot.baskets();
+select * from  iot.queries();
+
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -95,6 +95,7 @@ static str
 BSKTnewbasket(sql_schema *s, sql_table *t)
 {
        int i, idx;
+       int colcnt=0;
        node *o;
 
        // Don't introduce the same basket twice
@@ -117,10 +118,10 @@ BSKTnewbasket(sql_schema *s, sql_table *
                        MT_lock_unset(&iotLock);
                        throw(MAL,"baskets.register","Unsupported type %d",tpe);
                }
-               baskets[idx].count++;
+               colcnt++;
        }
        // collect the column names
-       baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * 
(baskets[idx].count+1));
+       baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (colcnt+1));
        for (i=0, o = t->columns.set->h; o; o = o->next){
         sql_column *col = o->data;
                baskets[idx].cols[i++]=  col->base.name;
@@ -510,6 +511,8 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
                assert( access (buf,R_OK) == 0);
                //unlink(buf);
        }
+       baskets[bskt].status = BSKTAVAILABLE;
+       baskets[bskt].count = cnt;
 
 recover:
        /* reset all BATs when they are misaligned or error occurred */
@@ -526,6 +529,42 @@ recover:
     (void) mb;
     return msg;
 }
+
+/* remove tuples from a basket according to the sliding policy */
+str
+BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    str sch = *getArgReference_str(stk, pci, 1);
+    str tbl = *getArgReference_str(stk, pci, 2);
+       BAT *b;
+       node *n;
+       mvc *m = NULL;
+       str msg;
+       int bskt;
+
+       (void) mb;
+       (void) stk;
+       (void) pci;
+
+       msg= getSQLContext(cntxt,NULL, &m, NULL);
+    bskt = BSKTlocate(sch,tbl);
+       if (bskt == 0)
+               throw(SQL, "iot.finish", "Could not find the basket 
%s.%s",sch,tbl);
+
+       if( msg ==MAL_SUCCEED)
+       /* reset all stream BATs to empty*/
+       for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+               sql_column *c = n->data;
+               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               assert( b );
+               // use the proper basket policy
+               BATsetcount(b,0);
+               BBPunfix(b->batCacheid);
+       }
+       baskets[bskt].count= 0;
+       return msg;
+}
+
 str
 BSKTdump(void *ret)
 {
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -84,7 +84,7 @@ iot_export str BSKTwindow(Client cntxt, 
 
 iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTtableerrors(bat *nmeId, bat *errorId);
-iot_export str BSKTerror(void  *ret, str *sch, str *fcn, str *msg);
+iot_export str BSKTfinish( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 
 //iot_export str BSKTnewbasket(sql_schema *s, sql_table *t);
 iot_export str BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/backends/monet5/iot/basket.mal 
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -81,6 +81,10 @@ command reset():void
 address BSKTreset
 comment "Remove all baskets";
 
+pattern finish(sch:str, tbl:str):void
+address BSKTfinish
+comment "Empty the basket using the prevaling policy";
+
 pattern iot.basket(sch:str, tbl:str, dir:str):void
 address BSKTpushBasket
 comment "Push a directory with the binary files";
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -335,6 +335,7 @@ PNexecute( void *n)
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s 
transition done:%s\n",node->modname, node->fcnname, (msg != 
MAL_SUCCEED?msg:""));
 
        MT_lock_set(&iotLock);
+       // empty the baskets according to their policy
        for ( i=0; i< j &&  node->enabled && node->places[i]; i++) {
                idx = node->places[i];
                baskets[idx].status = BSKTAVAILABLE;
@@ -398,7 +399,7 @@ PNscheduler(void *dummy)
                                        }
                                } else
                                /* consider baskets that are properly filled */
-                               if (baskets[idx].threshold > 
baskets[idx].count){
+                               if (baskets[idx].threshold > baskets[idx].count 
|| baskets[idx].count == 0){
                                        pnet[i].enabled = 0;
                                        break;
                                }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to