Changeset: 6b0bc7248043 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6b0bc7248043 Modified Files: sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/petrinet.c sql/include/sql_catalog.h sql/server/rel_schema.c Branch: iot Log Message:
Use basic relational tables make sure you lock the baskets before using. diffs (200 lines): diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -12,6 +12,8 @@ end; call iot.query('iot','cq00'); call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stream_tmp;'); +--insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34); + select * from iot.baskets(); select * from iot.queries(); select * from iot.inputplaces(); diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -110,8 +110,14 @@ BSKTnewbasket(sql_schema *s, sql_table * baskets[idx].seen = * timestamp_nil; baskets[idx].count = 0; - for (o = t->columns.set->h; o; o = o->next) + for (o = t->columns.set->h; o; o = o->next){ + sql_column *col = o->data; + int tpe = col->type.type->localtype; + + if (tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || tpe == TYPE_timestamp) + throw(MAL,"baskets.register","Unsupported type"); baskets[idx].count++; + } baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT); if (baskets[idx].table_name == NULL || baskets[idx].errors == NULL) { @@ -280,7 +286,15 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal int bskt; char buf[BUFSIZ]; node *n; + mvc *m = NULL; + BAT *b; + int first=1; + BUN cnt =0; + str msg; + msg= getSQLContext(cntxt,NULL, &m, NULL); + if( msg != MAL_SUCCEED) + return msg; bskt = BSKTlocate(sch,tbl); if (bskt == 0) throw(SQL, "iot.push", "Could not find the basket %s.%s",sch,tbl); @@ -290,12 +304,28 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal throw(SQL, "iot.push", "Could not access the basket directory %s. error %d",dir,errno); } + // types are already checked during stream initialization + MT_lock_set(&baskets[bskt].lock); for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ sql_column *c = n->data; snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name); _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf); + BATattach(c->type.type->localtype,buf,PERSISTENT); + b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL); + if( b){ + baskets[bskt].count = BATcount(b); + BBPunfix(b->batCacheid); + if( first){ + cnt = BATcount(b); + first = 0; + } else + if( cnt != BATcount(b)){ + MT_lock_unset(&baskets[bskt].lock); + throw(MAL,"iot.push","Non-aligned binary input files"); + } + } } - (void) cntxt; + MT_lock_unset(&baskets[bskt].lock); (void) mb; return MAL_SUCCEED; } diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -292,14 +292,30 @@ PNanalysis(Client cntxt, MalBlkPtr mb, i * experiment with more advanced schemes, e.g., priority queues. * * During each step cycle we first enable the transformations. + * + * Locking the streams is necessary to avoid concurrent changes. + * Using a fixed order over the basket table, ensure no deadlock. */ static void PNexecute( void *n) { PNnode *node= (PNnode *) n; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed %s.%s\n",node->modname, node->fcnname); + int j, idx; + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s\n",node->modname, node->fcnname); + // first grab exclusive access to all streams. + for (j = 0; j < MAXBSKT && node->enabled && node->places[j]; j++) { + idx = node->places[j]; + MT_lock_set(&baskets[idx].lock); + } + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all locked\n",node->modname, node->fcnname); runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0); node->status = BSKTPAUSE; + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s transition done\n",node->modname, node->fcnname); + for (j = MAXBSKT; j > 0 && node->enabled && node->places[j]; j--) { + idx = node->places[j]; + MT_lock_unset(&baskets[idx].lock); + } + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all unlocked\n",node->modname, node->fcnname); } static void @@ -332,13 +348,13 @@ PNcontroller(void *dummy) /* collect latest statistics, note that we don't need a lock here, because the count need not be accurate to the usec. It will simply come back. We also only have to check the places that are marked - empty. */ + non empty. */ for(i=0; i< MAXBSKT; i++) claimed[i]=0; now = GDKusec(); for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++) if ( pnet[i].status != BSKTPAUSE ){ - // check if all baskets are available + // check if all baskets are available and non-empty pnet[i].enabled = 1; for (j = 0; j < MAXBSKT && pnet[i].enabled && pnet[i].places[j]; j++) { idx = pnet[i].places[j]; @@ -365,7 +381,9 @@ PNcontroller(void *dummy) _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#petrinet: %s.%s enabled twice,disgarded \n", pnet[i].modname, pnet[i].fcnname); pnet[i].enabled = 0; break; - } + } + + /* rule out all others */ if( pnet[i].enabled) for (j = 0; j < MAXBSKT && pnet[i].enabled && pnet[i].places[j]; j++) claimed[pnet[i].places[j]]= 1; @@ -377,10 +395,8 @@ PNcontroller(void *dummy) } analysis = GDKusec() - now; - /* execute each enabled transformation */ - /* We don't need to access again all the factories and check again which are available to execute them - * we have already kept the enable ones in the enabled list (created in the previous loop) - * and now it is enough to access that list*/ + /* Execute each enabled transformation */ + /* Tricky part is here a single stream used by multiple transitions */ for (m = 0; m < k; m++) { i = enabled[m]; if (pnet[i].enabled ) { diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h --- a/sql/include/sql_catalog.h +++ b/sql/include/sql_catalog.h @@ -461,7 +461,7 @@ typedef enum table_types { tt_replica_table = 6 /* multiple replica of the same table */ } table_types; -#define isTable(x) (x->type==tt_table) +#define isTable(x) (x->type==tt_table || x->type == tt_stream) #define isView(x) (x->type==tt_view) #define isMergeTable(x) (x->type==tt_merge_table) #define isStream(x) (x->type==tt_stream) diff --git a/sql/server/rel_schema.c b/sql/server/rel_schema.c --- a/sql/server/rel_schema.c +++ b/sql/server/rel_schema.c @@ -877,12 +877,12 @@ rel_create_table(mvc *sql, sql_schema *s if (sname && !(s = mvc_bind_schema(sql, sname))) return sql_error(sql, 02, "3F000!CREATE TABLE: no such schema '%s'", sname); - if (temp != SQL_PERSIST && tt == tt_table && + if (temp != SQL_PERSIST && (tt == tt_table || tt == tt_stream) && commit_action == CA_COMMIT) commit_action = CA_DELETE; if (temp != SQL_DECLARED_TABLE) { - if (temp != SQL_PERSIST && tt == tt_table) { + if (temp != SQL_PERSIST && (tt == tt_table || tt == tt_stream)) { s = mvc_bind_schema(sql, "tmp"); if (temp == SQL_LOCAL_TEMP && sname && strcmp(sname, s->base.name) != 0) return sql_error(sql, 02, "3F000!CREATE TABLE: local tempory tables should be stored in the '%s' schema", s->base.name); @@ -922,7 +922,7 @@ rel_create_table(mvc *sql, sql_schema *s if (res == SQL_ERR) return NULL; } - temp = (tt == tt_table)?temp:SQL_PERSIST; + temp = (tt == tt_table || tt == tt_stream)?temp:SQL_PERSIST; return rel_table(sql, DDL_CREATE_TABLE, sname, t, temp); } else { /* [col name list] as subquery with or without data */ sql_rel *sq = NULL, *res = NULL; @@ -944,7 +944,7 @@ rel_create_table(mvc *sql, sql_schema *s } /* insert query result into this table */ - temp = (tt == tt_table)?temp:SQL_PERSIST; + temp = (tt == tt_table || tt == tt_stream)?temp:SQL_PERSIST; res = rel_table(sql, DDL_CREATE_TABLE, sname, t, temp); if (with_data) { res = rel_insert(sql, res, sq); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list