Changeset: 26b684dad92c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=26b684dad92c
Modified Files:
        monetdb5/optimizer/opt_iot.c
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/basket.c
Branch: iot
Log Message:

Fix cleaning out the basket


diffs (203 lines):

diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -52,9 +52,11 @@ OPTiotImplementation(Client cntxt, MalBl
        int done[MAXBSKT]= {0};
        int btop=0;
        int noerror=0;
+       int cq;
 
        (void) pci;
 
+       cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
        old = mb->stmt;
        limit = mb->stop;
        slimit = mb->ssize;
@@ -162,13 +164,14 @@ OPTiotImplementation(Client cntxt, MalBl
                        if( getModuleId(p)== iotRef && 
getFunctionId(p)==errorRef)
                                noerror++;
                        if (p->token == ENDsymbol && btop > 0 && noerror==0) {
-                               // empty all baskets used
-                               for( j=0; j<btop; j++)
+                               // empty all baskets used only when we are 
optimizing a cq
+                               for( j=0; cq && j<btop; j++)
                                if( done[j]==0) {
                                        p= newStmt(mb,basketRef,finishRef);
                                        p= pushStr(mb,p, schemas[j]);
                                        p= pushStr(mb,p, tables[j]);
                                }
+
                                /* catch any exception left behind */
                                r = newAssignment(mb);
                                j = getArg(r, 0) = newVariable(mb, 
GDKstrdup("SQLexception"), TYPE_str);
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -11,7 +11,7 @@ begin
 end;
 
 call iot.query('iot','cq00');
-call iot.query('insert into iot.result select min(t), count(*), avg(val) from 
iot.stmp;');
+--call iot.query('insert into iot.result select min(t), count(*), avg(val) 
from iot.stmp;');
 
 select * from  iot.baskets();
 select * from  iot.queries();
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -331,7 +331,7 @@ BSKTbindColumn(Client cntxt, str sch, st
 
 
        if( c)
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
        return b;
 }
 
@@ -423,7 +423,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
                _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
                if( access (buf,R_OK))
                        throw(MAL,"iot.basket","Could not access the column %s 
file %s\n",c->base.name, buf);
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                if( b == 0)
                        throw(MAL,"iot.basket","Could not access the column 
%s\n",c->base.name);
        }
@@ -442,7 +442,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
                (void) fseek(f,0, SEEK_END);
                fsize = ftell(f);
                rewind(f);
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                assert( b);
                bcnt = BATcount(b);
 
@@ -492,7 +492,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
        /* check for mis-aligned columns */
        for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n; 
n= n->next){
                sql_column *c = n->data;
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                assert( b );
                if( first){
                        first = 0;
@@ -517,7 +517,7 @@ recover:
        if( msg != MAL_SUCCEED)
        for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                sql_column *c = n->data;
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                assert( b );
                BATsetcount(b,0);
                BBPunfix(b->batCacheid);
@@ -565,7 +565,7 @@ BSKTfinish(Client cntxt, MalBlkPtr mb, M
 shiftcolumns:
                for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                        sql_column *c = n->data;
-                       b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+                       b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                        assert( b );
                        cnt=BATcount(b);
                        if( cnt < stride)
@@ -605,7 +605,7 @@ shiftcolumns:
                lng *first, *last, stop;
                n = baskets[bskt].table->columns.set->h; 
                c = n->data;
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                assert( b );
                if( b->ttype !=TYPE_timestamp)
                        throw(SQL, "iot.finish", "Could not find the leading 
'iotclk' in %s.%s",sch,tbl);
@@ -621,7 +621,7 @@ shiftcolumns:
        /* default action: reset all stream BATs to empty*/
        for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                sql_column *c = n->data;
-               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                assert( b );
                // use the proper basket policy
                BATsetcount(b,0);
@@ -651,7 +651,7 @@ BSKTdump(void *ret)
                        cnt = 0;
                        n = baskets[bskt].table->columns.set->h;
                        c = n->data;
-                       b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+                       b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                        if( b){
                                cnt = BATcount(b);
                                BBPunfix(b->batCacheid);
@@ -684,26 +684,18 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
     str sname = *getArgReference_str(stk, pci, 2);
     str tname = *getArgReference_str(stk, pci, 3);
     str cname = *getArgReference_str(stk, pci, 4);
-    ptr ins = getArgReference(stk, pci, 5);
+    ptr value = getArgReference(stk, pci, 5);
     int tpe = getArgType(mb, pci, 5);
     sql_schema *s;
     sql_table *t;
     sql_column *c;
-    BAT *bn=0, *b = 0;
+    BAT *bn=0, *binsert = 0;
 
     *res = 0;
     if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
         return msg;
     if ((msg = checkSQLContext(cntxt)) != NULL)
         return msg;
-    if (tpe > GDKatomcnt)
-        tpe = TYPE_bat;
-    if (tpe == TYPE_bat && (ins = BATdescriptor(*(int *) ins)) == NULL)
-        throw(SQL, "basket.append", "Cannot access descriptor");
-    if (ATOMextern(tpe))
-        ins = *(ptr *) ins;
-    if ( tpe == TYPE_bat)
-        b =  (BAT*) ins;
 
     s = mvc_bind_schema(m, sname);
     if (s == NULL)
@@ -712,18 +704,24 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
        if ( t)
                c= mvc_bind_column(m, t, cname);
        else throw(SQL,"basket.append","Stream table %s.%s not accessible for 
append\n",sname,tname);
-       if( c) {
-               bn = store_funcs.bind_col(m->session->tr,c,RDONLY);
-               if( bn){
-                       if( tpe == TYPE_bat)
-                               BATappend(bn, b, TRUE);
-                       else BUNappend(bn, ins, TRUE);
-                       BBPunfix(bn->batCacheid);
-               }
-       } else throw(SQL,"basket.append","Stream column %s.%s.%s not accessible 
for append\n",sname,tname,cname);
-       if (tpe == TYPE_bat) {
-               BBPunfix(((BAT *) ins)->batCacheid);
-       }
+       if( c == NULL) 
+               throw(SQL,"basket.append","Stream column %s.%s.%s not 
accessible for append\n",sname,tname,cname);
+
+    if ( isaBatType(tpe) && (binsert = BATdescriptor(*(int *) value)) == NULL)
+        throw(SQL, "basket.append", "Cannot access source descriptor");
+       if ( ATOMextern(tpe))
+               value = *(ptr*) value;
+
+       bn = store_funcs.bind_col(m->session->tr,c,RD_INS);
+       if( bn){
+               if (binsert)
+                       BATappend(bn, binsert, TRUE);
+               else
+                       BUNappend(bn, value, TRUE);
+               BBPunfix(bn->batCacheid);
+       } else throw(SQL, "basket.append", "Cannot access target descriptor");
+       if (binsert )
+               BBPunfix(((BAT *) binsert)->batCacheid);
        return MAL_SUCCEED;
 }
 
@@ -787,7 +785,7 @@ BSKTclear(Client cntxt, MalBlkPtr mb, Ma
        for( i=0; baskets[idx].cols[i]; i++){
                c= mvc_bind_column(m, t, baskets[idx].cols[i]);
                if( c){
-                       b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+                       b = store_funcs.bind_col(m->session->tr,c,RD_INS);
                        if(b){
                                BATsetcount(b,0);
                                BBPunfix(b->batCacheid);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to