Changeset: 737505d184aa for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=737505d184aa Modified Files: sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/basket.mal Branch: iot Log Message:
Minor changes diffs (206 lines): diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -12,7 +12,7 @@ end; call iot.query('iot','cq00'); call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stream_tmp;'); ---insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34); +insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34); select * from iot.baskets(); select * from iot.queries(); diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -95,7 +95,7 @@ BSKTlocate(str sch, str tbl) static str BSKTnewbasket(sql_schema *s, sql_table *t) { - int idx; + int i, idx; node *o; // Don't introduce the same basket twice @@ -118,6 +118,14 @@ BSKTnewbasket(sql_schema *s, sql_table * throw(MAL,"baskets.register","Unsupported type %d",tpe); baskets[idx].count++; } + // collect the column names + baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * baskets[idx].count+1); + for (i=0, o = t->columns.set->h; o; o = o->next){ + sql_column *col = o->data; + baskets[idx].cols[i++]= col->base.name; + } + + // baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); if (baskets[idx].table_name == NULL || baskets[idx].errors == NULL) { @@ -170,6 +178,31 @@ BSKTregister(Client cntxt, MalBlkPtr mb, return msg; } +static BAT * +BSKTbindColumn(Client cntxt, str sch, str tbl, str col) +{ + BAT *b = NULL; + mvc *m = NULL; + sql_schema *s = NULL; + sql_table *t = NULL; + sql_column *c = NULL; + + if( BSKTlocate(sch,tbl) < 0) + return b; + + if( getSQLContext(cntxt,NULL, &m, NULL) ) + return 0; + s= mvc_bind_schema(m, sch); + if ( s) + t= mvc_bind_table(m, s, tbl); + if ( t) + c= mvc_bind_column(m, t, col); + + + if( c) + b = store_funcs.bind_col(m->session->tr,c,RDONLY); + return b; +} str BSKTbind(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -177,39 +210,13 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal str sch = *getArgReference_str(stk,pci,1); str tbl = *getArgReference_str(stk,pci,2); str col = *getArgReference_str(stk,pci,3); - int idx; - mvc *m = NULL; - sql_schema *s = NULL; - sql_table *t = NULL; - sql_column *c = NULL; - BAT *b; - str msg; - (void) mb; - *ret = 0; - - msg= getSQLContext(cntxt,NULL, &m, NULL); - if( msg != MAL_SUCCEED) - return msg; - s= mvc_bind_schema(m, sch); - if ( s) - t= mvc_bind_table(m, s, tbl); - if ( t) - c= mvc_bind_column(m, t, col); - - idx= BSKTlocate(sch,tbl); - if (idx <= 0){ - msg= BSKTnewbasket(s, t); - if ( msg != MAL_SUCCEED) - return msg; - } - - if( c){ - b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); - if( b) - BBPkeepref(*ret = b->batCacheid); + BAT *b = BSKTbindColumn(cntxt, sch,tbl,col); + if( b){ + BBPkeepref(*ret = b->batCacheid); return MAL_SUCCEED; } + (void) mb; throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not found",sch,tbl,col); } @@ -313,8 +320,8 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal sql_column *c = n->data; snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name); _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf); - BATattach(c->type.type->localtype,buf,PERSISTENT); - b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + //BATattach(c->type.type->localtype,buf,PERSISTENT); + b = store_funcs.bind_col(m->session->tr,c,RDONLY); if( b){ baskets[bskt].count = BATcount(b); BBPunfix(b->batCacheid); @@ -352,7 +359,7 @@ BSKTdump(void *ret) cnt = 0; n = baskets[bskt].table->columns.set->h; c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + b = store_funcs.bind_col(m->session->tr,c,RDONLY); if( b){ cnt = BATcount(b); BBPunfix(b->batCacheid); @@ -414,7 +421,7 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M c= mvc_bind_column(m, t, cname); else throw(SQL,"basket.append","Stream table %s.%s not accessible\n",sname,tname); if( c) { - bn = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + bn = store_funcs.bind_col(m->session->tr,c,RDONLY); if( bn){ if( tpe == TYPE_bat) BATappend(bn, b, TRUE); @@ -439,12 +446,25 @@ BSKTupdateInstruction(MalBlkPtr mb, str /* provide a tabular view for inspection */ str -BSKTtable(bat *schemaId, bat *nameId, bat *thresholdId, bat * winsizeId, bat *winstrideId, bat *timesliceId, bat *timestrideId, bat *beatId, bat *seenId, bat *eventsId) +BSKTtable (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - BAT *schema= NULL, *name = NULL, *seen = NULL, *events = NULL; + bat *schemaId = getArgReference_bat(stk,pci,0); + bat *nameId = getArgReference_bat(stk,pci,1); + bat *thresholdId = getArgReference_bat(stk,pci,1); + bat *winsizeId = getArgReference_bat(stk,pci,2); + bat *winstrideId = getArgReference_bat(stk,pci,3); + bat *timesliceId = getArgReference_bat(stk,pci,4); + bat *timestrideId = getArgReference_bat(stk,pci,5); + bat *beatId = getArgReference_bat(stk,pci,6); + bat *seenId = getArgReference_bat(stk,pci,7); + bat *eventsId = getArgReference_bat(stk,pci,8); + BAT *schema = NULL, *name = NULL, *seen = NULL, *events = NULL; BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL; BAT *timeslice = NULL, *timestride = NULL; int i; + BAT *bn = NULL; + + (void) mb; schema = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); if (schema == 0) @@ -497,7 +517,8 @@ BSKTtable(bat *schemaId, bat *nameId, ba BUNappend(winstride, &baskets[i].winstride, FALSE); BUNappend(beat, &baskets[i].beat, FALSE); BUNappend(seen, &baskets[i].seen, FALSE); - baskets[i].events = 0; //(int) BATcount( baskets[i].bats[0]); + bn = BSKTbindColumn(cntxt,baskets[i].schema_name, baskets[i].table_name, baskets[i].cols[0]); + baskets[i].events = bn ? (int) BATcount( bn): 0; BUNappend(events, &baskets[i].events, FALSE); BUNappend(timeslice, &baskets[i].timeslice, FALSE); BUNappend(timestride, &baskets[i].timestride, FALSE); diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h --- a/sql/backends/monet5/iot/basket.h +++ b/sql/backends/monet5/iot/basket.h @@ -82,7 +82,7 @@ iot_export str BSKTreset(void *ret); iot_export int BSKTlocate(str sch, str tbl); iot_export str BSKTdump(void *ret); -iot_export str BSKTtable(bat *schemaId, bat *nameId, bat *thresholdId, bat * winsizeId, bat *winstrideId,bat *timesliceId, bat *timestrideId, bat *beatId, bat *seenId, bat *eventsId); +iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTtableerrors(bat *nmeId, bat *errorId); iot_export str BSKTerror(int *ret, str *sch, str *fcn, str *msg); diff --git a/sql/backends/monet5/iot/basket.mal b/sql/backends/monet5/iot/basket.mal --- a/sql/backends/monet5/iot/basket.mal +++ b/sql/backends/monet5/iot/basket.mal @@ -77,7 +77,7 @@ pattern iot.push(sch:str, tbl:str, dir:s address BSKTpush comment "Push a directory with the binary files"; -command iot.baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int], +pattern iot.baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int], timestride:bat[:int], beat:bat[:int], seen:bat[:timestamp], events:bat[:int]) address BSKTtable comment "Inspect the iot baskets"; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list