Changeset: ae4e6e2095ec for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae4e6e2095ec Modified Files: sql/backends/monet5/iot/50_iot.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/petrinet.c Branch: iot Log Message:
Add a private MAL client per continous query always make the errors table accessiblg diffs (246 lines): diff --git a/sql/backends/monet5/iot/50_iot.sql b/sql/backends/monet5/iot/50_iot.sql --- a/sql/backends/monet5/iot/50_iot.sql +++ b/sql/backends/monet5/iot/50_iot.sql @@ -94,9 +94,9 @@ create function iot.outputs() returns table( "s" string, "t" string, "sch" string, "qry" string) external name iot.outputplaces; --- create function iot.errors() --- returns table( "schema" string, "table" string, error string) --- external name iot.errors; +create function iot.errors() +returns table( "table" string, error string) +external name iot.errors; -- tables for iotwebserver diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -74,8 +74,8 @@ BSKTclean(int idx) baskets[idx].table_name = NULL; BBPreclaim(baskets[idx].errors); + baskets[idx].errors = NULL; baskets[idx].winstride = -1; - baskets[idx].errors = NULL; baskets[idx].count = 0; } for(idx = 1; idx < bsktTop; idx++){ @@ -85,8 +85,8 @@ BSKTclean(int idx) baskets[idx].table_name = NULL; BBPreclaim(baskets[idx].errors); + baskets[idx].errors = NULL; baskets[idx].winstride = -1; - baskets[idx].errors = NULL; baskets[idx].count = 0; } } @@ -607,15 +607,11 @@ BSKTdump(void *ret) int bskt; BUN cnt; BAT *b; - mvc *m = NULL; str msg = MAL_SUCCEED; mnstr_printf(GDKout, "#baskets table\n"); for (bskt = 1; bskt < bsktLimit; bskt++) if (baskets[bskt].table_name) { - msg = getSQLContext(mal_clients, 0, &m, NULL); - if ( msg != MAL_SUCCEED) - break; cnt = 0; b = baskets[bskt].bats[0]; if( b) @@ -684,6 +680,48 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M } str +BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int *res = getArgReference_int(stk, pci, 0); + str sname = *getArgReference_str(stk, pci, 2); + str tname = *getArgReference_str(stk, pci, 3); + str cname = *getArgReference_str(stk, pci, 4); + bat rows = *getArgReference_bat(stk, pci, 5); + bat val = *getArgReference_bat(stk, pci, 6); + BAT *bn=0, *rid=0, *bval = 0; + int bskt; + + return 0; + (void) cntxt; + (void) mb; + *res = 0; + + rid = BATdescriptor(rows); + if( rid == NULL) + throw(SQL, "basket.append", "Cannot access source oid descriptor"); + bval = BATdescriptor(val); + if( bval == NULL){ + BBPunfix(rid->batCacheid); + throw(SQL, "basket.append", "Cannot access source descriptor"); + } + + bskt = BSKTlocate(sname,tname); + if( bskt == 0) + throw(SQL, "basket.append", "Cannot access basket descriptor %s.%s",sname,tname); + bn = BSKTbindColumn(sname,tname,cname); + + if( bn){ + void_replace_bat(bn, rid, bval, TRUE); + BATderiveProps(bn, FALSE); + } else throw(SQL, "basket.append", "Cannot access target column %s.%s.%s",sname,tname,cname); + + baskets[bskt].status = BSKTFILLED; + BBPunfix(rid->batCacheid); + BBPunfix(bval->batCacheid); + return MAL_SUCCEED; +} + +str BSKTreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { lng *res = getArgReference_lng(stk, pci, 0); @@ -894,7 +932,7 @@ BSKTtableerrors(bat *nameId, bat *errorI } for (i = 1; i < bsktTop; i++) - if (BATcount(baskets[i].errors) > 0) { + if (baskets[i].errors && BATcount(baskets[i].errors) > 0) { bi = bat_iterator(baskets[i].errors); BATloop(baskets[i].errors, p, q) { diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h --- a/sql/backends/monet5/iot/basket.h +++ b/sql/backends/monet5/iot/basket.h @@ -89,6 +89,7 @@ iot_export int BSKTlocate(str sch, str t iot_export int BSKTunlocate(str sch, str tbl); iot_export int BSKTlocate(str sch, str tbl); iot_export str BSKTappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTimportInternal(Client cntxt, int bskt); iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c --- a/sql/backends/monet5/iot/iot.c +++ b/sql/backends/monet5/iot/iot.c @@ -171,12 +171,20 @@ static void IOTreceptorThread(void *dummy) { int idx = *(int*)dummy; + Client cntxt = MCinitClient(0, mal_clients[0].fdin, mal_clients[0].fdout); + + if( cntxt == NULL) + return; + //SQLinitClient(cntxt); _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n", baskets[idx].schema_name, baskets[idx].table_name, baskets[idx].source); /* continously scan the container for baskets */ - BSKTimportInternal(mal_clients, idx); + BSKTimportInternal(cntxt, idx); + _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the file\n", + baskets[idx].schema_name, + baskets[idx].table_name); } str diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -62,6 +62,7 @@ typedef struct { str fcnname; MalBlkPtr mb; /* Query block */ MalStkPtr stk; /* might be handy */ + Client client; /* MAL client context for this query */ int status; /* query status waiting/running/paused */ int enabled; /* all baskets are available */ @@ -164,6 +165,13 @@ PNregisterInternal(Client cntxt, MalBlkP pnet[pnettop].mb = nmb; pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize); + pnet[pnettop].client = MCinitClient(0,0,0); + if ( pnet[pnettop].client == NULL) + throw(MAL,"petrinet.register","Failed to create client record for continous query"); + msg = SQLinitClient(pnet[pnettop].client); + if( msg) + return msg; + pnet[pnettop].status = PNWAIT; pnet[pnettop].cycles = 0; pnet[pnettop].seen = *timestamp_nil; @@ -238,12 +246,15 @@ PNstop(void){ int i,cnt; _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n"); - pnstatus = PNSTOP; + pnstatus = PNSTOP; // avoid starting new continuous queries + for(cnt=0, i = 0; i < pnettop; i++) + if( pnet[i].client ) + pnet[i].client->itrace ='x'; + do{ MT_sleep_ms(20); - for(cnt=0, i = 0; i < pnettop; i++){ - cnt += pnet[i].status == PNRUNNING; - } + for(cnt=0, i = 0; i < pnettop; i++) + cnt += pnet[i].status != PNWAIT; } while(cnt); BSKTclean(0); _DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n"); @@ -271,6 +282,7 @@ PNderegister(Client cntxt, MalBlkPtr mb, } GDKfree(pnet[i].modname); GDKfree(pnet[i].fcnname); + MCcloseClient(pnet[i].client); for( ; i <pnettop-1;i++) pnet[i]= pnet[i+1]; memset((void*) (pnet+i), 0, sizeof(PNnode)); @@ -282,6 +294,7 @@ PNderegister(Client cntxt, MalBlkPtr mb, for ( i = 0; i < pnettop; i++){ GDKfree(pnet[i].modname); GDKfree(pnet[i].fcnname); + MCcloseClient(pnet[i].client); memset((void*) (pnet+i), 0, sizeof(PNnode)); } pnettop = 0; @@ -395,7 +408,7 @@ PNexecute( void *n) _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all locked\n",node->modname, node->fcnname); - msg = runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0); + msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0); _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s transition done:%s\n", @@ -425,8 +438,16 @@ PNscheduler(void *dummy) timestamp ts, tn; _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n"); - cntxt = mal_clients; /* run as admin in SQL mode*/ - if( strcmp(cntxt->scenario, "sql") ) + cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/ + if( cntxt){ + if( SQLinitClient(cntxt) != MAL_SUCCEED) + GDKerror("Could not initialize PNscheduler"); + }else{ + GDKerror("Could not initialize PNscheduler"); + return; + } + + if( cntxt->scenario == NULL || strcmp(cntxt->scenario, "sql") ) SQLinitEnvironment(cntxt, NULL, NULL, NULL); pnstatus = PNRUNNING; // global state @@ -545,6 +566,7 @@ PNscheduler(void *dummy) } } + MCcloseClient(cntxt); pnstatus = PNINIT; _DEBUG_PETRINET_ mnstr_flush(PNout); (void) dummy; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list