Changeset: 931c38cd87bc for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=931c38cd87bc Modified Files: sql/backends/monet5/iot/Tests/bug05.sql sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/Tests/iot02.sql sql/backends/monet5/iot/Tests/iot06.stable.out sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/petrinet.c sql/backends/monet5/iot/petrinet.h Branch: iot Log Message:
Fixing bounds§ diffs (truncated from 621 to 300 lines): diff --git a/sql/backends/monet5/iot/Tests/bug05.sql b/sql/backends/monet5/iot/Tests/bug05.sql --- a/sql/backends/monet5/iot/Tests/bug05.sql +++ b/sql/backends/monet5/iot/Tests/bug05.sql @@ -17,7 +17,7 @@ INSERT INTO testing VALUES (now(), 2, 2) INSERT INTO testing VALUES (now(), 3, 3); CALL iot.show('sys', 'cquery'); -CALL iot.stop(); +CALL iot.pause(); DROP PROCEDURE cquery; DROP TABLE testout; diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -35,7 +35,7 @@ select * from result; --select * from iot.baskets(); --select * from iot.queries(); select * from iot.errors(); -call iot.stop(); +call iot.pause(); drop procedure cq00; drop table stmp; drop table result; diff --git a/sql/backends/monet5/iot/Tests/iot02.sql b/sql/backends/monet5/iot/Tests/iot02.sql --- a/sql/backends/monet5/iot/Tests/iot02.sql +++ b/sql/backends/monet5/iot/Tests/iot02.sql @@ -24,14 +24,15 @@ insert into stmp2 values('2005-09-23 12: call iot.resume('iot','cq02'); -- wait for 5 seconds for handler -call iot.wait(5000); select 'RESULT'; +--call iot.cycles('iot','cq02',4); +call iot.wait(2000); select * from stmp2; select * from result1; select * from result2; -call iot.stop(); +call iot.pause(); select * from iot.errors(); drop procedure cq02; drop table stmp2; diff --git a/sql/backends/monet5/iot/Tests/iot06.stable.out b/sql/backends/monet5/iot/Tests/iot06.stable.out --- a/sql/backends/monet5/iot/Tests/iot06.stable.out +++ b/sql/backends/monet5/iot/Tests/iot06.stable.out @@ -39,24 +39,24 @@ Ready. % clob # type % 63 # length unsafe function iot.cq06():void; - X_1 := sql.mvc(); - X_33 := basket.register(X_1,"iot","tmp06",0); -barrier X_56 := language.dataflow(); - C_2:bat[:oid] := basket.tid(X_1,"iot","tmp06"); - X_5:bat[:timestamp] := basket.bind(X_33,"iot","tmp06","t"); - X_8 := aggr.min(X_5); -exit X_56; - X_9 := sql.append(X_33,"iot","result","t",X_8); - X_11 := aggr.count(X_5); - X_12 := calc.int(X_11); - X_13 := sql.append(X_9,"iot","result","sensor",X_12); - X_15:bat[:int] := basket.bind(X_13,"iot","tmp06","val"); - X_17:bat[:dbl] := batcalc.dbl(2,X_15); - X_19:dbl := aggr.avg(X_17); - X_20 := calc.int(X_19,8,2); - X_22 := sql.append(X_13,"iot","result","val",X_20); - X_34 := basket.tumble(X_22,"iot","tmp06"); - basket.commit(X_34,"iot","tmp06"); + X_0 := sql.mvc(); + X_32 := basket.register(X_0,"iot","tmp06",0); +barrier X_60 := language.dataflow(); + C_1:bat[:oid] := basket.tid(X_0,"iot","tmp06"); + X_4:bat[:timestamp] := basket.bind(X_32,"iot","tmp06","t"); + X_8 := aggr.min(X_4); +exit X_60; + X_10 := sql.append(X_32,"iot","result","t",X_8); + X_12 := aggr.count(X_4); + X_13 := calc.int(X_12); + X_15 := sql.append(X_10,"iot","result","sensor",X_13); + X_17:bat[:int] := basket.bind(X_15,"iot","tmp06","val"); + X_20:bat[:dbl] := batcalc.dbl(2,X_17); + X_24:dbl := aggr.avg(X_20); + X_25 := calc.int(X_24,8,2); + X_28 := sql.append(X_15,"iot","result","val",X_25); + X_36 := basket.tumble(X_28,"iot","tmp06"); + basket.commit(X_36,"iot","tmp06"); catch SQLexception:str; iot.error("user","cq06",SQLexception); exit SQLexception:str; diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -352,6 +352,8 @@ BSKTtid(Client cntxt, MalBlkPtr mb, MalS if( bskt == 0) throw(SQL,"basket.bind","Stream table column '%s.%s' not found\n",sch,tbl); b = baskets[bskt].bats[0]; + if( b == 0) + throw(SQL,"basket.bind","Stream table reference column '%s.%s' not accessible\n",sch,tbl); tids = COLnew(0, TYPE_void, 0, TRANSIENT); if (tids == NULL) @@ -464,6 +466,7 @@ BSKTimportInternal(Client cntxt, int bsk assert( b); bcnt = BATcount(b); + if( fsize > 0) switch(ATOMstorage(b->ttype)){ case TYPE_bit: case TYPE_bte: @@ -657,9 +660,9 @@ BSKTexport(Client cntxt, MalBlkPtr mb, M } /* remove tuples from a basket according to the sliding policy */ -#define ColumnShift(B,TPE, STRIDE) { \ +#define ColumnShift(B,TPE, CNT) { \ TPE *first= (TPE*) Tloc(B, 0);\ - TPE *n = first+STRIDE;\ + TPE *n = first+CNT;\ TPE *last= (TPE*) Tloc(B, BUNlast(B));\ for( ; n < last; n++, first++)\ *first=*n;\ @@ -710,7 +713,9 @@ BSKTtumbleInternal(Client cntxt, str sch } if( stride == -1) BATsetcount(b, 0); - else BATsetcount(b, BATcount(b)-cnt); + else + if( BATcount(b) >= cnt) + BATsetcount(b, BATcount(b)-cnt); if( BATcount(b) == 0){ baskets[bskt].status = BSKTWAIT; } @@ -974,8 +979,10 @@ BSKTreset(Client cntxt, MalBlkPtr mb, Ma MT_lock_set(&baskets[idx].lock); for( i=0; baskets[idx].cols[i]; i++){ b = baskets[idx].bats[i]; - if(b) + if(b){ BATsetcount(b,0); + BATsettrivprop(b); + } } baskets[idx].status = BSKTWAIT; MT_lock_unset(&baskets[idx].lock); diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -63,7 +63,6 @@ typedef struct { str fcnname; MalBlkPtr mb; /* Query block */ MalStkPtr stk; /* might be handy */ - Client client; /* MAL client context for this query */ int status; /* query status waiting/running/paused */ int enabled; /* all baskets are available */ @@ -192,7 +191,9 @@ PNregisterInternal(Client cntxt, MalBlkP Symbol s; char buf[IDLENGTH]; - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n", init); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#registerInternal status %d\n", init); +#endif if (pnettop == MAXPN) GDKerror("petrinet.register:Too many transitions"); @@ -218,20 +219,13 @@ PNregisterInternal(Client cntxt, MalBlkP setArgType(nmb,q, 0, TYPE_void); pushEndInstruction(nmb); chkProgram(cntxt->fdout, cntxt->nspace, nmb); - _DEBUG_PETRINET_ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL); +#ifdef DEBUG_PETRINET + printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL); +#endif pnet[pnettop].mb = nmb; pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize); - if(pnet[pnettop].client == NULL) { - pnet[pnettop].client = MCinitClient(0,0,0); - if (pnet[pnettop].client == NULL) - throw(MAL,"petrinet.register","Failed to create client record for continous query\n"); - msg = SQLinitClient(pnet[pnettop].client); - if(msg) - return msg; - } - pnet[pnettop].status = PNWAIT; pnet[pnettop].limit = calls; pnet[pnettop].seen = *timestamp_nil; @@ -266,13 +260,17 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma throw(SQL,"iot.pause","Continuous query %s.%s not found\n", modname, fcnname); } pnet[i].status = newstatus; - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", modname, fcnname, statusname[newstatus]); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", modname, fcnname, statusname[newstatus]); +#endif MT_lock_unset(&iotLock); return MAL_SUCCEED; } for ( i = 0; i < pnettop; i++){ pnet[i].status = newstatus; - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s: %s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#scheduler status %s.%s: %s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]); +#endif } MT_lock_unset(&iotLock); return MAL_SUCCEED; @@ -280,28 +278,39 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma str PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler\n"); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#resume scheduler\n"); +#endif return PNstatus(cntxt, mb, stk, pci, PNWAIT); } str PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler\n"); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#pause scheduler\n"); +#endif return PNstatus(cntxt, mb, stk, pci, PNPAUSED); } str PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ +#ifdef DEBUG_PETRINET int old = PNcycle; +#endif int delay= *getArgReference_int(stk,pci,1); lng clk = GDKms(); + (void) cntxt; (void) mb; - _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#scheduler wait %d ms\n",delay); +#ifdef DEBUG_PETRINET + mnstr_printf(cntxt->fdout, "#scheduler wait %d ms\n",delay); +#endif delay = delay < PNDELAY? 2*PNDELAY:delay; while( GDKms() < clk + delay ) MT_sleep_ms(PNDELAY); - _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#wait finished after %d cycles\n",PNcycle -old ); +#ifdef DEBUG_PETRINET + mnstr_printf(cntxt->fdout, "#wait finished after %d cycles\n",PNcycle -old ); +#endif return MAL_SUCCEED; } @@ -312,7 +321,6 @@ PNderegisterInternal(int i){ MT_lock_set(&iotLock); GDKfree(pnet[i].modname); GDKfree(pnet[i].fcnname); - //MCcloseClient(pnet[i].client); memset((void*) (pnet+i), 0, sizeof(PNnode)); for( ; i<pnettop-1; i++) pnet[i] = pnet[i+1]; @@ -338,25 +346,30 @@ PNderegister(Client cntxt, MalBlkPtr mb, throw(SQL,"iot.pause","Continuous query %s.%s not found\n", modname, fcnname); } PNderegisterInternal(i); - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered %s.%s\n", modname, fcnname); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#scheduler deregistered %s.%s\n", modname, fcnname); +#endif return MAL_SUCCEED; } for ( i = pnettop-1; i >= 0 ; i--) PNderegisterInternal(i); - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n"); +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#scheduler deregistered all\n"); +#endif return MAL_SUCCEED; } /* safely stop the engine by stopping all CQ firt */ str PNstop(void){ - int i, cnt,limit = 20; - _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n"); + int i, cnt,limit = 200; +#ifdef DEBUG_PETRINET + mnstr_printf(GDKout, "#scheduler being stopped\n"); +#endif + MT_lock_set(&iotLock); pnstatus = PNSTOP; // avoid starting new continuous queries - for( i = 0; i < pnettop; i++) - if( pnet[i].client ) - pnet[i].client->itrace ='x'; + MT_lock_unset(&iotLock); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list