Changeset: 26b684dad92c for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=26b684dad92c Modified Files: monetdb5/optimizer/opt_iot.c sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/basket.c Branch: iot Log Message:
Fix cleaning out the basket diffs (203 lines): diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c --- a/monetdb5/optimizer/opt_iot.c +++ b/monetdb5/optimizer/opt_iot.c @@ -52,9 +52,11 @@ OPTiotImplementation(Client cntxt, MalBl int done[MAXBSKT]= {0}; int btop=0; int noerror=0; + int cq; (void) pci; + cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0; old = mb->stmt; limit = mb->stop; slimit = mb->ssize; @@ -162,13 +164,14 @@ OPTiotImplementation(Client cntxt, MalBl if( getModuleId(p)== iotRef && getFunctionId(p)==errorRef) noerror++; if (p->token == ENDsymbol && btop > 0 && noerror==0) { - // empty all baskets used - for( j=0; j<btop; j++) + // empty all baskets used only when we are optimizing a cq + for( j=0; cq && j<btop; j++) if( done[j]==0) { p= newStmt(mb,basketRef,finishRef); p= pushStr(mb,p, schemas[j]); p= pushStr(mb,p, tables[j]); } + /* catch any exception left behind */ r = newAssignment(mb); j = getArg(r, 0) = newVariable(mb, GDKstrdup("SQLexception"), TYPE_str); diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -11,7 +11,7 @@ begin end; call iot.query('iot','cq00'); -call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stmp;'); +--call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stmp;'); select * from iot.baskets(); select * from iot.queries(); diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -331,7 +331,7 @@ BSKTbindColumn(Client cntxt, str sch, st if( c) - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); return b; } @@ -423,7 +423,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf); if( access (buf,R_OK)) throw(MAL,"iot.basket","Could not access the column %s file %s\n",c->base.name, buf); - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); if( b == 0) throw(MAL,"iot.basket","Could not access the column %s\n",c->base.name); } @@ -442,7 +442,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m (void) fseek(f,0, SEEK_END); fsize = ftell(f); rewind(f); - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b); bcnt = BATcount(b); @@ -492,7 +492,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m /* check for mis-aligned columns */ for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n; n= n->next){ sql_column *c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b ); if( first){ first = 0; @@ -517,7 +517,7 @@ recover: if( msg != MAL_SUCCEED) for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ sql_column *c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b ); BATsetcount(b,0); BBPunfix(b->batCacheid); @@ -565,7 +565,7 @@ BSKTfinish(Client cntxt, MalBlkPtr mb, M shiftcolumns: for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ sql_column *c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b ); cnt=BATcount(b); if( cnt < stride) @@ -605,7 +605,7 @@ shiftcolumns: lng *first, *last, stop; n = baskets[bskt].table->columns.set->h; c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b ); if( b->ttype !=TYPE_timestamp) throw(SQL, "iot.finish", "Could not find the leading 'iotclk' in %s.%s",sch,tbl); @@ -621,7 +621,7 @@ shiftcolumns: /* default action: reset all stream BATs to empty*/ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){ sql_column *c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); assert( b ); // use the proper basket policy BATsetcount(b,0); @@ -651,7 +651,7 @@ BSKTdump(void *ret) cnt = 0; n = baskets[bskt].table->columns.set->h; c = n->data; - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); if( b){ cnt = BATcount(b); BBPunfix(b->batCacheid); @@ -684,26 +684,18 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M str sname = *getArgReference_str(stk, pci, 2); str tname = *getArgReference_str(stk, pci, 3); str cname = *getArgReference_str(stk, pci, 4); - ptr ins = getArgReference(stk, pci, 5); + ptr value = getArgReference(stk, pci, 5); int tpe = getArgType(mb, pci, 5); sql_schema *s; sql_table *t; sql_column *c; - BAT *bn=0, *b = 0; + BAT *bn=0, *binsert = 0; *res = 0; if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL) return msg; if ((msg = checkSQLContext(cntxt)) != NULL) return msg; - if (tpe > GDKatomcnt) - tpe = TYPE_bat; - if (tpe == TYPE_bat && (ins = BATdescriptor(*(int *) ins)) == NULL) - throw(SQL, "basket.append", "Cannot access descriptor"); - if (ATOMextern(tpe)) - ins = *(ptr *) ins; - if ( tpe == TYPE_bat) - b = (BAT*) ins; s = mvc_bind_schema(m, sname); if (s == NULL) @@ -712,18 +704,24 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M if ( t) c= mvc_bind_column(m, t, cname); else throw(SQL,"basket.append","Stream table %s.%s not accessible for append\n",sname,tname); - if( c) { - bn = store_funcs.bind_col(m->session->tr,c,RDONLY); - if( bn){ - if( tpe == TYPE_bat) - BATappend(bn, b, TRUE); - else BUNappend(bn, ins, TRUE); - BBPunfix(bn->batCacheid); - } - } else throw(SQL,"basket.append","Stream column %s.%s.%s not accessible for append\n",sname,tname,cname); - if (tpe == TYPE_bat) { - BBPunfix(((BAT *) ins)->batCacheid); - } + if( c == NULL) + throw(SQL,"basket.append","Stream column %s.%s.%s not accessible for append\n",sname,tname,cname); + + if ( isaBatType(tpe) && (binsert = BATdescriptor(*(int *) value)) == NULL) + throw(SQL, "basket.append", "Cannot access source descriptor"); + if ( ATOMextern(tpe)) + value = *(ptr*) value; + + bn = store_funcs.bind_col(m->session->tr,c,RD_INS); + if( bn){ + if (binsert) + BATappend(bn, binsert, TRUE); + else + BUNappend(bn, value, TRUE); + BBPunfix(bn->batCacheid); + } else throw(SQL, "basket.append", "Cannot access target descriptor"); + if (binsert ) + BBPunfix(((BAT *) binsert)->batCacheid); return MAL_SUCCEED; } @@ -787,7 +785,7 @@ BSKTclear(Client cntxt, MalBlkPtr mb, Ma for( i=0; baskets[idx].cols[i]; i++){ c= mvc_bind_column(m, t, baskets[idx].cols[i]); if( c){ - b = store_funcs.bind_col(m->session->tr,c,RDONLY); + b = store_funcs.bind_col(m->session->tr,c,RD_INS); if(b){ BATsetcount(b,0); BBPunfix(b->batCacheid); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list