Changeset: 391b3bd2e7b5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=391b3bd2e7b5
Modified Files:
        clients/Tests/exports.stable.out
        clients/Tests/malcheck.stable.out
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/iot.h
        sql/backends/monet5/iot/iot.mal
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
        sql/backends/monet5/sql_optimizer.c
        sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.err
        sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.out
Branch: iot
Log Message:

Intermittent commit


diffs (truncated from 813 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -2043,6 +2043,7 @@ void dumpExceptionsToStream(stream *out,
 void dumpHelpTable(stream *f, Module s, str text, int flag);
 void dumpSearchTable(stream *f, str text);
 str eqRef;
+str errorRef;
 str escape_str(str *retval, str s);
 str evalFile(Client c, str fname, int listing);
 str evalRef;
diff --git a/clients/Tests/malcheck.stable.out 
b/clients/Tests/malcheck.stable.out
--- a/clients/Tests/malcheck.stable.out
+++ b/clients/Tests/malcheck.stable.out
@@ -10,7 +10,6 @@ BSKTthreshold: missing for MAL command t
 BSKTwindow: missing for MAL command window in 
sql/backends/monet5/iot/basket.mal
 BSKTtimewindow: missing for MAL command timewindow in 
sql/backends/monet5/iot/basket.mal
 BSKTbeat: missing for MAL command beat in sql/backends/monet5/iot/basket.mal
-PNtype: missing for MAL pattern types in sql/backends/monet5/iot/petrinet.mal
 PNstep: missing for MAL pattern step in sql/backends/monet5/iot/petrinet.mal
 
 # 15:16:26 >  
diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -24,42 +24,33 @@ create procedure iot.query(qry string)
 create procedure iot.query("schema" string, name string)
        external name iot.query;
 
--- pause the processing of a continuous query
-create procedure iot.pause ("schema" string, name string)
-    external name iot.pause;
+create procedure iot.activate("schema" string, name string)
+       external name iot.activate;
 
-create procedure iot.pause ()
-    external name iot.pause;
+create procedure iot.activate()
+       external name iot.activate;
 
--- resume the processing of a continuous query
-create procedure iot.resume ("schema" string, name string)
-    external name iot.resume;
+create procedure iot.deactivate("schema" string, name string)
+       external name iot.deactivate;
 
-create procedure iot.resume ()
-    external name iot.resume;
+create procedure iot.deactivate()
+       external name iot.deactivate;
 
 -- resume with limited the number of scheduler before next pause
 create procedure iot.cycles(n integer)
        external name iot.cycles;
 
--- stop and remove a continuous query
-create procedure iot.stop ("schema" string, name string)
-    external name iot.stop;
-
-create procedure iot.stop ()
-    external name iot.stop;
-
 -- deliver a new basket with tuples
 create procedure iot.push("schema" string, "table" string, dirpath string)
        external name iot.push;
 
 -- Inspection tables
 create function iot.baskets()
-returns table( "schema" string,  "table" string, threshold int, winsize int, 
winstride int,  timeslice int, timestride int, beat int, seen timestamp, events 
int)
+returns table( "schema" string,  "table" string, "status" string,  threshold 
int, winsize int, winstride int,  timeslice int, timestride int, beat int, seen 
timestamp, events int)
 external name iot.baskets;
 
 create function iot.queries()
- returns table( "schema" string,  "function" string, status string, lastrun 
timestamp, cycles int, events int, time bigint, error string)
+ returns table( "schema" string,  "function" string, "status" string, lastrun 
timestamp, cycles int, events int, time bigint, error string)
  external name iot.queries;
 
 create function iot.inputplaces()
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -12,9 +12,19 @@ end;
 call iot.query('iot','cq00');
 call iot.query('insert into iot.result select min(t), count(*), avg(val) from 
iot.stream_tmp;');
 
-insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34);
-
 select * from  iot.baskets();
 select * from  iot.queries();
 select * from  iot.inputplaces();
 select * from  iot.outputplaces();
+
+-- stop all continuous queries
+call iot.deactivate();
+
+insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34);
+select * from stream_tmp;
+
+-- reactivate all continuous queries
+call iot.activate();
+
+select * from  iot.baskets();
+select * from  iot.queries();
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -36,7 +36,7 @@
 //#define _DEBUG_BASKET_ if(0)
 #define _DEBUG_BASKET_ 
 
-str statusname[6] = { "<unknown>", "init", "paused", "running", "stop", 
"error" };
+str statusname[3] = { "<unknown>", "running", "paused" };
 
 BasketRec *baskets;   /* the global iot catalog */
 static int bsktTop = 0, bsktLimit = 0;
@@ -178,6 +178,65 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
        return msg;
 }
 
+str
+BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch, tbl;
+       int idx = 0;
+
+       (void) cntxt;
+       (void) mb;
+
+       if( pci->argc > pci->retc){
+               sch = *getArgReference_str(stk, pci, 1);
+               tbl = *getArgReference_str(stk, pci, 2);
+
+               /* check for registration */
+               idx = BSKTlocate(sch, tbl);
+               if( idx == 0)
+                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible\n",sch,tbl);
+               MT_lock_set(&baskets[idx].lock);
+               baskets[idx].status = BSKTRUNNING;
+               MT_lock_unset(&baskets[idx].lock);
+       } else {
+               for( idx =1; idx <bsktTop;  idx++){
+                       MT_lock_set(&baskets[idx].lock);
+                       baskets[idx].status = BSKTRUNNING;
+                       MT_lock_unset(&baskets[idx].lock);
+               }
+       }
+       return MAL_SUCCEED;
+}
+
+str
+BSKTdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch, tbl;
+       int idx = 0;
+
+       (void) cntxt;
+       (void) mb;
+       if( pci->argc > pci->retc){
+               sch = *getArgReference_str(stk, pci, 1);
+               tbl = *getArgReference_str(stk, pci, 2);
+
+               /* check for registration */
+               idx = BSKTlocate(sch, tbl);
+               if( idx == 0)
+                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible\n",sch,tbl);
+               MT_lock_set(&baskets[idx].lock);
+               baskets[idx].status = BSKTPAUSED;
+               MT_lock_unset(&baskets[idx].lock);
+       } else {
+               for( idx =1; idx <bsktTop;  idx++){
+                       MT_lock_set(&baskets[idx].lock);
+                       baskets[idx].status = BSKTPAUSED;
+                       MT_lock_unset(&baskets[idx].lock);
+               }
+       }
+       return MAL_SUCCEED;
+}
+
 static BAT *
 BSKTbindColumn(Client cntxt, str sch, str tbl, str col)
 {
@@ -435,12 +494,32 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
        return MAL_SUCCEED;
 }
 
-InstrPtr
-BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl)
+str
+BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
+    str sname = *getArgReference_str(stk, pci, 2);
+    str tname = *getArgReference_str(stk, pci, 3);
+       int idx;
+       (void) cntxt;
        (void) mb;
-       (void) sch;
-       (void) tbl;
+
+       idx = BSKTlocate(sname,tname);
+       if( idx == 0)
+               throw(SQL,"basket.commit","Stream column %s.%s not 
accessible\n",sname,tname);
+
+       MT_lock_set(&baskets[idx].lock);
+       baskets[idx].count++;
+       MT_lock_unset(&baskets[idx].lock);
+       return MAL_SUCCEED;
+}
+
+str
+BSKTupdate (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
        return NULL;
 }
 
@@ -450,15 +529,16 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
 {
        bat *schemaId = getArgReference_bat(stk,pci,0);
        bat *nameId = getArgReference_bat(stk,pci,1);
-       bat *thresholdId = getArgReference_bat(stk,pci,1);
-       bat *winsizeId = getArgReference_bat(stk,pci,2);
-       bat *winstrideId = getArgReference_bat(stk,pci,3);
-       bat *timesliceId = getArgReference_bat(stk,pci,4);
-       bat *timestrideId = getArgReference_bat(stk,pci,5);
-       bat *beatId = getArgReference_bat(stk,pci,6);
-       bat *seenId = getArgReference_bat(stk,pci,7);
-       bat *eventsId = getArgReference_bat(stk,pci,8);
-       BAT *schema = NULL, *name = NULL, *seen = NULL, *events = NULL;
+       bat *statusId = getArgReference_bat(stk,pci,2);
+       bat *thresholdId = getArgReference_bat(stk,pci,3);
+       bat *winsizeId = getArgReference_bat(stk,pci,4);
+       bat *winstrideId = getArgReference_bat(stk,pci,5);
+       bat *timesliceId = getArgReference_bat(stk,pci,6);
+       bat *timestrideId = getArgReference_bat(stk,pci,7);
+       bat *beatId = getArgReference_bat(stk,pci,8);
+       bat *seenId = getArgReference_bat(stk,pci,9);
+       bat *eventsId = getArgReference_bat(stk,pci,10);
+       BAT *schema = NULL, *name = NULL, *status = NULL,  *seen = NULL, 
*events = NULL;
        BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
        BAT *timeslice = NULL, *timestride = NULL;
        int i;
@@ -473,7 +553,11 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
        name = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
        if (name == 0)
                goto wrapup;
-       BATseqbase(name, 0);
+       BATseqbase(status, 0);
+       status = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+       if (status == 0)
+               goto wrapup;
+       BATseqbase(status, 0);
        threshold = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
        if (threshold == 0)
                goto wrapup;
@@ -486,6 +570,14 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
        if (winstride == 0)
                goto wrapup;
        BATseqbase(winstride, 0);
+       timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
+       if (timeslice == 0)
+               goto wrapup;
+       BATseqbase(timeslice, 0);
+       timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
+       if (timestride == 0)
+               goto wrapup;
+       BATseqbase(timestride, 0);
        beat = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
        if (beat == 0)
                goto wrapup;
@@ -499,33 +591,27 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
                goto wrapup;
        BATseqbase(events, 0);
 
-       timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
-       if (timeslice == 0)
-               goto wrapup;
-       BATseqbase(timeslice, 0);
-       timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
-       if (timestride == 0)
-               goto wrapup;
-       BATseqbase(timestride, 0);
 
        for (i = 1; i < bsktTop; i++)
                if (baskets[i].table_name) {
                        BUNappend(schema, baskets[i].schema_name, FALSE);
                        BUNappend(name, baskets[i].table_name, FALSE);
+                       BUNappend(status, statusname[baskets[i].status], FALSE);
                        BUNappend(threshold, &baskets[i].threshold, FALSE);
                        BUNappend(winsize, &baskets[i].winsize, FALSE);
                        BUNappend(winstride, &baskets[i].winstride, FALSE);
+                       BUNappend(timeslice, &baskets[i].timeslice, FALSE);
+                       BUNappend(timestride, &baskets[i].timestride, FALSE);
                        BUNappend(beat, &baskets[i].beat, FALSE);
                        BUNappend(seen, &baskets[i].seen, FALSE);
                        bn = BSKTbindColumn(cntxt,baskets[i].schema_name, 
baskets[i].table_name, baskets[i].cols[0]);
                        baskets[i].events = bn ? (int) BATcount( bn): 0;
                        BUNappend(events, &baskets[i].events, FALSE);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to