Changeset: 391b3bd2e7b5 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=391b3bd2e7b5 Modified Files: clients/Tests/exports.stable.out clients/Tests/malcheck.stable.out sql/backends/monet5/iot/50_iot.sql sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/basket.mal sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/iot.h sql/backends/monet5/iot/iot.mal sql/backends/monet5/iot/petrinet.c sql/backends/monet5/iot/petrinet.h sql/backends/monet5/iot/petrinet.mal sql/backends/monet5/sql_optimizer.c sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.err sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.out Branch: iot Log Message:
Intermittent commit diffs (truncated from 813 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -2043,6 +2043,7 @@ void dumpExceptionsToStream(stream *out, void dumpHelpTable(stream *f, Module s, str text, int flag); void dumpSearchTable(stream *f, str text); str eqRef; +str errorRef; str escape_str(str *retval, str s); str evalFile(Client c, str fname, int listing); str evalRef; diff --git a/clients/Tests/malcheck.stable.out b/clients/Tests/malcheck.stable.out --- a/clients/Tests/malcheck.stable.out +++ b/clients/Tests/malcheck.stable.out @@ -10,7 +10,6 @@ BSKTthreshold: missing for MAL command t BSKTwindow: missing for MAL command window in sql/backends/monet5/iot/basket.mal BSKTtimewindow: missing for MAL command timewindow in sql/backends/monet5/iot/basket.mal BSKTbeat: missing for MAL command beat in sql/backends/monet5/iot/basket.mal -PNtype: missing for MAL pattern types in sql/backends/monet5/iot/petrinet.mal PNstep: missing for MAL pattern step in sql/backends/monet5/iot/petrinet.mal # 15:16:26 > diff --git a/sql/backends/monet5/iot/50_iot.sql b/sql/backends/monet5/iot/50_iot.sql --- a/sql/backends/monet5/iot/50_iot.sql +++ b/sql/backends/monet5/iot/50_iot.sql @@ -24,42 +24,33 @@ create procedure iot.query(qry string) create procedure iot.query("schema" string, name string) external name iot.query; --- pause the processing of a continuous query -create procedure iot.pause ("schema" string, name string) - external name iot.pause; +create procedure iot.activate("schema" string, name string) + external name iot.activate; -create procedure iot.pause () - external name iot.pause; +create procedure iot.activate() + external name iot.activate; --- resume the processing of a continuous query -create procedure iot.resume ("schema" string, name string) - external name iot.resume; +create procedure iot.deactivate("schema" string, name string) + external name iot.deactivate; -create procedure iot.resume () - external name iot.resume; +create procedure iot.deactivate() + external name iot.deactivate; -- resume with limited the number of scheduler before next pause create procedure iot.cycles(n integer) external name iot.cycles; --- stop and remove a continuous query -create procedure iot.stop ("schema" string, name string) - external name iot.stop; - -create procedure iot.stop () - external name iot.stop; - -- deliver a new basket with tuples create procedure iot.push("schema" string, "table" string, dirpath string) external name iot.push; -- Inspection tables create function iot.baskets() -returns table( "schema" string, "table" string, threshold int, winsize int, winstride int, timeslice int, timestride int, beat int, seen timestamp, events int) +returns table( "schema" string, "table" string, "status" string, threshold int, winsize int, winstride int, timeslice int, timestride int, beat int, seen timestamp, events int) external name iot.baskets; create function iot.queries() - returns table( "schema" string, "function" string, status string, lastrun timestamp, cycles int, events int, time bigint, error string) + returns table( "schema" string, "function" string, "status" string, lastrun timestamp, cycles int, events int, time bigint, error string) external name iot.queries; create function iot.inputplaces() diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -12,9 +12,19 @@ end; call iot.query('iot','cq00'); call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stream_tmp;'); -insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34); - select * from iot.baskets(); select * from iot.queries(); select * from iot.inputplaces(); select * from iot.outputplaces(); + +-- stop all continuous queries +call iot.deactivate(); + +insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34); +select * from stream_tmp; + +-- reactivate all continuous queries +call iot.activate(); + +select * from iot.baskets(); +select * from iot.queries(); diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -36,7 +36,7 @@ //#define _DEBUG_BASKET_ if(0) #define _DEBUG_BASKET_ -str statusname[6] = { "<unknown>", "init", "paused", "running", "stop", "error" }; +str statusname[3] = { "<unknown>", "running", "paused" }; BasketRec *baskets; /* the global iot catalog */ static int bsktTop = 0, bsktLimit = 0; @@ -178,6 +178,65 @@ BSKTregister(Client cntxt, MalBlkPtr mb, return msg; } +str +BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str sch, tbl; + int idx = 0; + + (void) cntxt; + (void) mb; + + if( pci->argc > pci->retc){ + sch = *getArgReference_str(stk, pci, 1); + tbl = *getArgReference_str(stk, pci, 2); + + /* check for registration */ + idx = BSKTlocate(sch, tbl); + if( idx == 0) + throw(SQL,"basket.activate","Stream table %s.%s not accessible\n",sch,tbl); + MT_lock_set(&baskets[idx].lock); + baskets[idx].status = BSKTRUNNING; + MT_lock_unset(&baskets[idx].lock); + } else { + for( idx =1; idx <bsktTop; idx++){ + MT_lock_set(&baskets[idx].lock); + baskets[idx].status = BSKTRUNNING; + MT_lock_unset(&baskets[idx].lock); + } + } + return MAL_SUCCEED; +} + +str +BSKTdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str sch, tbl; + int idx = 0; + + (void) cntxt; + (void) mb; + if( pci->argc > pci->retc){ + sch = *getArgReference_str(stk, pci, 1); + tbl = *getArgReference_str(stk, pci, 2); + + /* check for registration */ + idx = BSKTlocate(sch, tbl); + if( idx == 0) + throw(SQL,"basket.activate","Stream table %s.%s not accessible\n",sch,tbl); + MT_lock_set(&baskets[idx].lock); + baskets[idx].status = BSKTPAUSED; + MT_lock_unset(&baskets[idx].lock); + } else { + for( idx =1; idx <bsktTop; idx++){ + MT_lock_set(&baskets[idx].lock); + baskets[idx].status = BSKTPAUSED; + MT_lock_unset(&baskets[idx].lock); + } + } + return MAL_SUCCEED; +} + static BAT * BSKTbindColumn(Client cntxt, str sch, str tbl, str col) { @@ -435,12 +494,32 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M return MAL_SUCCEED; } -InstrPtr -BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl) +str +BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { + str sname = *getArgReference_str(stk, pci, 2); + str tname = *getArgReference_str(stk, pci, 3); + int idx; + (void) cntxt; (void) mb; - (void) sch; - (void) tbl; + + idx = BSKTlocate(sname,tname); + if( idx == 0) + throw(SQL,"basket.commit","Stream column %s.%s not accessible\n",sname,tname); + + MT_lock_set(&baskets[idx].lock); + baskets[idx].count++; + MT_lock_unset(&baskets[idx].lock); + return MAL_SUCCEED; +} + +str +BSKTupdate (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + (void) cntxt; + (void) mb; + (void) stk; + (void) pci; return NULL; } @@ -450,15 +529,16 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M { bat *schemaId = getArgReference_bat(stk,pci,0); bat *nameId = getArgReference_bat(stk,pci,1); - bat *thresholdId = getArgReference_bat(stk,pci,1); - bat *winsizeId = getArgReference_bat(stk,pci,2); - bat *winstrideId = getArgReference_bat(stk,pci,3); - bat *timesliceId = getArgReference_bat(stk,pci,4); - bat *timestrideId = getArgReference_bat(stk,pci,5); - bat *beatId = getArgReference_bat(stk,pci,6); - bat *seenId = getArgReference_bat(stk,pci,7); - bat *eventsId = getArgReference_bat(stk,pci,8); - BAT *schema = NULL, *name = NULL, *seen = NULL, *events = NULL; + bat *statusId = getArgReference_bat(stk,pci,2); + bat *thresholdId = getArgReference_bat(stk,pci,3); + bat *winsizeId = getArgReference_bat(stk,pci,4); + bat *winstrideId = getArgReference_bat(stk,pci,5); + bat *timesliceId = getArgReference_bat(stk,pci,6); + bat *timestrideId = getArgReference_bat(stk,pci,7); + bat *beatId = getArgReference_bat(stk,pci,8); + bat *seenId = getArgReference_bat(stk,pci,9); + bat *eventsId = getArgReference_bat(stk,pci,10); + BAT *schema = NULL, *name = NULL, *status = NULL, *seen = NULL, *events = NULL; BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL; BAT *timeslice = NULL, *timestride = NULL; int i; @@ -473,7 +553,11 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M name = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); if (name == 0) goto wrapup; - BATseqbase(name, 0); + BATseqbase(status, 0); + status = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); + if (status == 0) + goto wrapup; + BATseqbase(status, 0); threshold = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); if (threshold == 0) goto wrapup; @@ -486,6 +570,14 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M if (winstride == 0) goto wrapup; BATseqbase(winstride, 0); + timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); + if (timeslice == 0) + goto wrapup; + BATseqbase(timeslice, 0); + timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); + if (timestride == 0) + goto wrapup; + BATseqbase(timestride, 0); beat = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); if (beat == 0) goto wrapup; @@ -499,33 +591,27 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M goto wrapup; BATseqbase(events, 0); - timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); - if (timeslice == 0) - goto wrapup; - BATseqbase(timeslice, 0); - timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT); - if (timestride == 0) - goto wrapup; - BATseqbase(timestride, 0); for (i = 1; i < bsktTop; i++) if (baskets[i].table_name) { BUNappend(schema, baskets[i].schema_name, FALSE); BUNappend(name, baskets[i].table_name, FALSE); + BUNappend(status, statusname[baskets[i].status], FALSE); BUNappend(threshold, &baskets[i].threshold, FALSE); BUNappend(winsize, &baskets[i].winsize, FALSE); BUNappend(winstride, &baskets[i].winstride, FALSE); + BUNappend(timeslice, &baskets[i].timeslice, FALSE); + BUNappend(timestride, &baskets[i].timestride, FALSE); BUNappend(beat, &baskets[i].beat, FALSE); BUNappend(seen, &baskets[i].seen, FALSE); bn = BSKTbindColumn(cntxt,baskets[i].schema_name, baskets[i].table_name, baskets[i].cols[0]); baskets[i].events = bn ? (int) BATcount( bn): 0; BUNappend(events, &baskets[i].events, FALSE); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list