Changeset: e3bf585cd53a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e3bf585cd53a
Modified Files:
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:

Stay closer to the structures of the SQL catalog


diffs (truncated from 420 to 300 lines):

diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -54,7 +54,7 @@ static int BSKTnewEntry(void)
                baskets = (BasketRec *) GDKrealloc(baskets, bsktLimit * 
sizeof(BasketRec));
        }
        for (i = 1; i < bsktLimit; i++)
-               if (baskets[i].table == NULL)
+               if (baskets[i].table_name == NULL)
                        break;
        bsktTop++;
        return i;
@@ -65,19 +65,11 @@ static int BSKTnewEntry(void)
 static void
 BSKTclean(int idx)
 {
-       int i;
-       GDKfree(baskets[idx].schema);
-       GDKfree(baskets[idx].table);
-       baskets[idx].schema = NULL;
-       baskets[idx].table = NULL;
-       if (baskets[idx].cols) {
-               for (i = 0; i < baskets[idx].count; i++)
-                       GDKfree(baskets[idx].cols[i]);
-               GDKfree(baskets[idx].cols);
-               baskets[idx].cols = NULL;
-       }
-       GDKfree(baskets[idx].bats);
-       baskets[idx].bats = NULL;
+       GDKfree(baskets[idx].schema_name);
+       GDKfree(baskets[idx].table_name);
+       baskets[idx].schema_name = NULL;
+       baskets[idx].table_name = NULL;
+
        BBPreclaim(baskets[idx].errors);
        baskets[idx].errors = NULL;
        baskets[idx].count = 0;
@@ -93,8 +85,8 @@ BSKTlocate(str sch, str tbl)
        if( sch == 0 || tbl == 0)
                return 0;
        for (i = 1; i < bsktTop; i++)
-               if (baskets[i].schema && strcmp(sch, baskets[i].schema) == 0 &&
-                       baskets[i].table && strcmp(tbl, baskets[i].table) == 0)
+               if (baskets[i].schema_name && strcmp(sch, 
baskets[i].schema_name) == 0 &&
+                       baskets[i].table_name && strcmp(tbl, 
baskets[i].table_name) == 0)
                        return i;
        return 0;
 }
@@ -103,57 +95,35 @@ BSKTlocate(str sch, str tbl)
 static str
 BSKTnewbasket(sql_schema *s, sql_table *t)
 {
-       int idx, i;
+       int idx;
        node *o;
-       BAT *b=  NULL;
-       sql_column  *c;
-       str msg= MAL_SUCCEED;
 
        // Don't introduce the same basket twice
        if( BSKTlocate(s->base.name, t->base.name) > 0)
-               return msg;
+               return MAL_SUCCEED;
        MT_lock_set(&iotLock);
        idx = BSKTnewEntry();
        MT_lock_init(&baskets[idx].lock,"newbasket");
 
-       baskets[idx].schema = GDKstrdup(s->base.name);
-       baskets[idx].table = GDKstrdup(t->base.name);
+       baskets[idx].schema_name = GDKstrdup(s->base.name);
+       baskets[idx].table_name = GDKstrdup(t->base.name);
        baskets[idx].seen = * timestamp_nil;
 
        baskets[idx].count = 0;
        for (o = t->columns.set->h; o; o = o->next)
                baskets[idx].count++;
-       baskets[idx].cols = GDKzalloc((baskets[idx].count + 1) * sizeof(str));
-       baskets[idx].bats = GDKzalloc((baskets[idx].count + 1) * sizeof(BAT *));
        baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
-       if (baskets[idx].table == NULL ||
-           baskets[idx].cols == NULL ||
-           baskets[idx].bats == NULL ||
+       if (baskets[idx].table_name == NULL ||
            baskets[idx].errors == NULL) {
                BSKTclean(idx);
                MT_lock_unset(&iotLock);
                throw(MAL,"baskets.register",MAL_MALLOC_FAIL);
        }
 
-       i = 0;
-       for (o = t->columns.set->h; o; o = o->next) {
-               c = o->data;
-               b= BATnew(TYPE_void, c->type.type->localtype, 0, TRANSIENT);
-               if (b == NULL) {
-                       BSKTclean(idx);
-                       MT_lock_unset(&iotLock);
-                       throw(MAL,"baskets.register","Can not locate stream 
column '%s.%s.%s'",s->base.name, t->base.name, c->base.name);
-               }
-               baskets[idx].bats[i] = b->batCacheid;
-               if ((baskets[idx].cols[i++] = GDKstrdup(c->base.name)) == NULL) 
{
-                       BSKTclean(idx);
-                       MT_lock_unset(&iotLock);
-                       throw(MAL,"baskets.register",MAL_MALLOC_FAIL);
-               }
-               BBPkeepref(b->batCacheid);
-       }
+       baskets[idx].schema = s;
+       baskets[idx].table = t;
        MT_lock_unset(&iotLock);
-       return msg;
+       return MAL_SUCCEED;
 }
 
 // MAL/SQL interface for registration of a single table
@@ -201,10 +171,14 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
        str sch = *getArgReference_str(stk,pci,1);
        str tbl = *getArgReference_str(stk,pci,2);
        str col = *getArgReference_str(stk,pci,3);
-       int idx,i;
+       int idx;
+       mvc *m = NULL;
+       sql_schema *s = NULL;
+       sql_table *t = NULL;
+       sql_column *c = NULL;
        BAT *b;
+       str msg;
 
-       (void) cntxt;
        (void) mb;
 
        *ret = 0;
@@ -212,13 +186,21 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
        if (idx <= 0)
                throw(SQL,"iot.bind","Stream table '%s.%s' not 
registered",sch,tbl);
 
-       for(i=0; i < baskets[idx].count; i++)
-               if( strcmp(baskets[idx].cols[i], col)== 0 ){
-                       b= BATdescriptor(baskets[idx].bats[i]);
-                       if( b)
-                               BBPkeepref(*ret =  b->batCacheid);
-                       return MAL_SUCCEED;
-               }
+       msg= getSQLContext(cntxt,NULL, &m, NULL);
+       if( msg != MAL_SUCCEED)
+               return msg;
+       s= mvc_bind_schema(m, sch);
+       if ( s)
+               t= mvc_bind_table(m, s, tbl);
+       if ( t)
+               c= mvc_bind_column(m, t, col);
+
+       if( c){
+               b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL);
+               if( b)
+                       BBPkeepref(*ret =  b->batCacheid);
+               return MAL_SUCCEED;
+       }
        throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not 
found",sch,tbl,col);
 }
 
@@ -283,7 +265,7 @@ BSKTreset(void *ret)
        int i;
        (void) ret;
        for (i = 1; i < bsktLimit; i++)
-               if (baskets[i].table)
+               if (baskets[i].table_name)
                        BSKTclean(i);
        return MAL_SUCCEED;
 }
@@ -295,8 +277,9 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
     str sch = *getArgReference_str(stk, pci, 1);
     str tbl = *getArgReference_str(stk, pci, 2);
     str dir = *getArgReference_str(stk, pci, 3);
-    int bskt,i;
+    int bskt;
        char buf[BUFSIZ];
+       node *n;
 
     bskt = BSKTlocate(sch,tbl);
        if (bskt == 0)
@@ -307,14 +290,13 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
                throw(SQL, "iot.push", "Could not access the basket directory 
%s. error %d",dir,errno);
        }
        
-       for(i=0; i < baskets[bskt].count ; i++){
-               snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, 
baskets[bskt].cols[i]);
+       for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+               sql_column *c = n->data;
+               snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name);
                _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
        }
     (void) cntxt;
     (void) mb;
-    (void) stk;
-    (void) pci;
     return MAL_SUCCEED;
 }
 str
@@ -323,20 +305,30 @@ BSKTdump(void *ret)
        int bskt;
        BUN cnt;
        BAT *b;
+       node *n;
+       sql_column *c;
+       mvc *m = NULL;
+       str msg = MAL_SUCCEED;
 
        mnstr_printf(GDKout, "#baskets table\n");
        for (bskt = 1; bskt < bsktLimit; bskt++)
-               if (baskets[bskt].table) {
+               if (baskets[bskt].table_name) {
+                       msg = getSQLContext(mal_clients, 0, &m, NULL);
+                       if ( msg != MAL_SUCCEED)
+                               break;
                        cnt = 0;
-                       if( baskets[bskt].bats[0]){
-                               b = BBPquickdesc(baskets[bskt].bats[0], TRUE);
-                               if( b)
-                                       cnt = BATcount(b);
+                       n = baskets[bskt].table->columns.set->h;
+                       c = n->data;
+                       b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL);
+                       if( b){
+                               cnt = BATcount(b);
+                               BBPunfix(b->batCacheid);
                        }
+
                        mnstr_printf(GDKout, "#baskets[%2d] %s.%s columns %d 
threshold %d window=[%d,%d] time window=[" LLFMT "," LLFMT "] beat " LLFMT " 
milliseconds" BUNFMT"\n",
                                        bskt,
-                                       baskets[bskt].schema,
-                                       baskets[bskt].table,
+                                       baskets[bskt].schema_name,
+                                       baskets[bskt].table_name,
                                        baskets[bskt].count,
                                        baskets[bskt].threshold,
                                        baskets[bskt].winsize,
@@ -348,7 +340,7 @@ BSKTdump(void *ret)
                }
 
        (void) ret;
-       return MAL_SUCCEED;
+       return msg;
 }
 
 str
@@ -364,6 +356,10 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
 InstrPtr
 BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl)
 {
+       (void) mb;
+       (void) sch;
+       (void) tbl;
+/*
        int i, j, bskt;
        InstrPtr p;
        BAT *b;
@@ -383,6 +379,8 @@ BSKTupdateInstruction(MalBlkPtr mb, str 
                p = pushArgument(mb, p, j);
        }
        return p;
+*/
+       return NULL;
 }
 
 /* provide a tabular view for inspection */
@@ -437,9 +435,9 @@ BSKTtable(bat *schemaId, bat *nameId, ba
        BATseqbase(timestride, 0);
 
        for (i = 1; i < bsktTop; i++)
-               if (baskets[i].table) {
-                       BUNappend(schema, baskets[i].schema, FALSE);
-                       BUNappend(name, baskets[i].table, FALSE);
+               if (baskets[i].table_name) {
+                       BUNappend(schema, baskets[i].schema_name, FALSE);
+                       BUNappend(name, baskets[i].table_name, FALSE);
                        BUNappend(threshold, &baskets[i].threshold, FALSE);
                        BUNappend(winsize, &baskets[i].winsize, FALSE);
                        BUNappend(winstride, &baskets[i].winstride, FALSE);
@@ -506,7 +504,7 @@ BSKTtableerrors(bat *nameId, bat *errorI
                        BATloop(baskets[i].errors, p, q)
                        {
                                str err = BUNtail(bi, p);
-                               BUNappend(name, &baskets[i].table, FALSE);
+                               BUNappend(name, &baskets[i].table_name, FALSE);
                                BUNappend(error, err, FALSE);
                        }
                }
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -41,15 +41,17 @@
 
 typedef struct{
        MT_Lock lock;
-       str schema;     /* schema for the basket */
-       str table;      /* table that represents the basket */
+       str schema_name;        /* schema for the basket */
+       str table_name; /* table that represents the basket */
+       sql_schema *schema;
+       sql_table *table;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to