Changeset: e3bf585cd53a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e3bf585cd53a Modified Files: sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/petrinet.c sql/backends/monet5/sql_optimizer.c Branch: iot Log Message:
Stay closer to the structures of the SQL catalog diffs (truncated from 420 to 300 lines): diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -54,7 +54,7 @@ static int BSKTnewEntry(void) baskets = (BasketRec *) GDKrealloc(baskets, bsktLimit * sizeof(BasketRec)); } for (i = 1; i < bsktLimit; i++) - if (baskets[i].table == NULL) + if (baskets[i].table_name == NULL) break; bsktTop++; return i; @@ -65,19 +65,11 @@ static int BSKTnewEntry(void) static void BSKTclean(int idx) { - int i; - GDKfree(baskets[idx].schema); - GDKfree(baskets[idx].table); - baskets[idx].schema = NULL; - baskets[idx].table = NULL; - if (baskets[idx].cols) { - for (i = 0; i < baskets[idx].count; i++) - GDKfree(baskets[idx].cols[i]); - GDKfree(baskets[idx].cols); - baskets[idx].cols = NULL; - } - GDKfree(baskets[idx].bats); - baskets[idx].bats = NULL; + GDKfree(baskets[idx].schema_name); + GDKfree(baskets[idx].table_name); + baskets[idx].schema_name = NULL; + baskets[idx].table_name = NULL; + BBPreclaim(baskets[idx].errors); baskets[idx].errors = NULL; baskets[idx].count = 0; @@ -93,8 +85,8 @@ BSKTlocate(str sch, str tbl) if( sch == 0 || tbl == 0) return 0; for (i = 1; i < bsktTop; i++) - if (baskets[i].schema && strcmp(sch, baskets[i].schema) == 0 && - baskets[i].table && strcmp(tbl, baskets[i].table) == 0) + if (baskets[i].schema_name && strcmp(sch, baskets[i].schema_name) == 0 && + baskets[i].table_name && strcmp(tbl, baskets[i].table_name) == 0) return i; return 0; } @@ -103,57 +95,35 @@ BSKTlocate(str sch, str tbl) static str BSKTnewbasket(sql_schema *s, sql_table *t) { - int idx, i; + int idx; node *o; - BAT *b= NULL; - sql_column *c; - str msg= MAL_SUCCEED; // Don't introduce the same basket twice if( BSKTlocate(s->base.name, t->base.name) > 0) - return msg; + return MAL_SUCCEED; MT_lock_set(&iotLock); idx = BSKTnewEntry(); MT_lock_init(&baskets[idx].lock,"newbasket"); - baskets[idx].schema = GDKstrdup(s->base.name); - baskets[idx].table = GDKstrdup(t->base.name); + baskets[idx].schema_name = GDKstrdup(s->base.name); + baskets[idx].table_name = GDKstrdup(t->base.name); baskets[idx].seen = * timestamp_nil; baskets[idx].count = 0; for (o = t->columns.set->h; o; o = o->next) baskets[idx].count++; - baskets[idx].cols = GDKzalloc((baskets[idx].count + 1) * sizeof(str)); - baskets[idx].bats = GDKzalloc((baskets[idx].count + 1) * sizeof(BAT *)); baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); - if (baskets[idx].table == NULL || - baskets[idx].cols == NULL || - baskets[idx].bats == NULL || + if (baskets[idx].table_name == NULL || baskets[idx].errors == NULL) { BSKTclean(idx); MT_lock_unset(&iotLock); throw(MAL,"baskets.register",MAL_MALLOC_FAIL); } - i = 0; - for (o = t->columns.set->h; o; o = o->next) { - c = o->data; - b= BATnew(TYPE_void, c->type.type->localtype, 0, TRANSIENT); - if (b == NULL) { - BSKTclean(idx); - MT_lock_unset(&iotLock); - throw(MAL,"baskets.register","Can not locate stream column '%s.%s.%s'",s->base.name, t->base.name, c->base.name); - } - baskets[idx].bats[i] = b->batCacheid; - if ((baskets[idx].cols[i++] = GDKstrdup(c->base.name)) == NULL) { - BSKTclean(idx); - MT_lock_unset(&iotLock); - throw(MAL,"baskets.register",MAL_MALLOC_FAIL); - } - BBPkeepref(b->batCacheid); - } + baskets[idx].schema = s; + baskets[idx].table = t; MT_lock_unset(&iotLock); - return msg; + return MAL_SUCCEED; } // MAL/SQL interface for registration of a single table @@ -201,10 +171,14 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal str sch = *getArgReference_str(stk,pci,1); str tbl = *getArgReference_str(stk,pci,2); str col = *getArgReference_str(stk,pci,3); - int idx,i; + int idx; + mvc *m = NULL; + sql_schema *s = NULL; + sql_table *t = NULL; + sql_column *c = NULL; BAT *b; + str msg; - (void) cntxt; (void) mb; *ret = 0; @@ -212,13 +186,21 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal if (idx <= 0) throw(SQL,"iot.bind","Stream table '%s.%s' not registered",sch,tbl); - for(i=0; i < baskets[idx].count; i++) - if( strcmp(baskets[idx].cols[i], col)== 0 ){ - b= BATdescriptor(baskets[idx].bats[i]); - if( b) - BBPkeepref(*ret = b->batCacheid); - return MAL_SUCCEED; - } + msg= getSQLContext(cntxt,NULL, &m, NULL); + if( msg != MAL_SUCCEED) + return msg; + s= mvc_bind_schema(m, sch); + if ( s) + t= mvc_bind_table(m, s, tbl); + if ( t) + c= mvc_bind_column(m, t, col); + + if( c){ + b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + if( b) + BBPkeepref(*ret = b->batCacheid); + return MAL_SUCCEED; + } throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not found",sch,tbl,col); } @@ -283,7 +265,7 @@ BSKTreset(void *ret) int i; (void) ret; for (i = 1; i < bsktLimit; i++) - if (baskets[i].table) + if (baskets[i].table_name) BSKTclean(i); return MAL_SUCCEED; } @@ -295,8 +277,9 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal str sch = *getArgReference_str(stk, pci, 1); str tbl = *getArgReference_str(stk, pci, 2); str dir = *getArgReference_str(stk, pci, 3); - int bskt,i; + int bskt; char buf[BUFSIZ]; + node *n; bskt = BSKTlocate(sch,tbl); if (bskt == 0) @@ -307,14 +290,13 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal throw(SQL, "iot.push", "Could not access the basket directory %s. error %d",dir,errno); } - for(i=0; i < baskets[bskt].count ; i++){ - snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, baskets[bskt].cols[i]); + for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ + sql_column *c = n->data; + snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name); _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf); } (void) cntxt; (void) mb; - (void) stk; - (void) pci; return MAL_SUCCEED; } str @@ -323,20 +305,30 @@ BSKTdump(void *ret) int bskt; BUN cnt; BAT *b; + node *n; + sql_column *c; + mvc *m = NULL; + str msg = MAL_SUCCEED; mnstr_printf(GDKout, "#baskets table\n"); for (bskt = 1; bskt < bsktLimit; bskt++) - if (baskets[bskt].table) { + if (baskets[bskt].table_name) { + msg = getSQLContext(mal_clients, 0, &m, NULL); + if ( msg != MAL_SUCCEED) + break; cnt = 0; - if( baskets[bskt].bats[0]){ - b = BBPquickdesc(baskets[bskt].bats[0], TRUE); - if( b) - cnt = BATcount(b); + n = baskets[bskt].table->columns.set->h; + c = n->data; + b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + if( b){ + cnt = BATcount(b); + BBPunfix(b->batCacheid); } + mnstr_printf(GDKout, "#baskets[%2d] %s.%s columns %d threshold %d window=[%d,%d] time window=[" LLFMT "," LLFMT "] beat " LLFMT " milliseconds" BUNFMT"\n", bskt, - baskets[bskt].schema, - baskets[bskt].table, + baskets[bskt].schema_name, + baskets[bskt].table_name, baskets[bskt].count, baskets[bskt].threshold, baskets[bskt].winsize, @@ -348,7 +340,7 @@ BSKTdump(void *ret) } (void) ret; - return MAL_SUCCEED; + return msg; } str @@ -364,6 +356,10 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M InstrPtr BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl) { + (void) mb; + (void) sch; + (void) tbl; +/* int i, j, bskt; InstrPtr p; BAT *b; @@ -383,6 +379,8 @@ BSKTupdateInstruction(MalBlkPtr mb, str p = pushArgument(mb, p, j); } return p; +*/ + return NULL; } /* provide a tabular view for inspection */ @@ -437,9 +435,9 @@ BSKTtable(bat *schemaId, bat *nameId, ba BATseqbase(timestride, 0); for (i = 1; i < bsktTop; i++) - if (baskets[i].table) { - BUNappend(schema, baskets[i].schema, FALSE); - BUNappend(name, baskets[i].table, FALSE); + if (baskets[i].table_name) { + BUNappend(schema, baskets[i].schema_name, FALSE); + BUNappend(name, baskets[i].table_name, FALSE); BUNappend(threshold, &baskets[i].threshold, FALSE); BUNappend(winsize, &baskets[i].winsize, FALSE); BUNappend(winstride, &baskets[i].winstride, FALSE); @@ -506,7 +504,7 @@ BSKTtableerrors(bat *nameId, bat *errorI BATloop(baskets[i].errors, p, q) { str err = BUNtail(bi, p); - BUNappend(name, &baskets[i].table, FALSE); + BUNappend(name, &baskets[i].table_name, FALSE); BUNappend(error, err, FALSE); } } diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h --- a/sql/backends/monet5/iot/basket.h +++ b/sql/backends/monet5/iot/basket.h @@ -41,15 +41,17 @@ typedef struct{ MT_Lock lock; - str schema; /* schema for the basket */ - str table; /* table that represents the basket */ + str schema_name; /* schema for the basket */ + str table_name; /* table that represents the basket */ + sql_schema *schema; + sql_table *table; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list