Changeset: 720479de31fe for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=720479de31fe Modified Files: sql/backends/monet5/sql_basket.c Branch: trails Log Message:
More clean up of the basket structure and some information for the code diffs (145 lines): diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c --- a/sql/backends/monet5/sql_basket.c +++ b/sql/backends/monet5/sql_basket.c @@ -38,6 +38,19 @@ BasketRec *baskets; /* the global timetrails catalog */ static int bsktTop = 0, bsktLimit = 0; +// Initialise a basket, but leave the caller to deal with the lock +#define BSKTinit(idx) \ + baskets[idx].table = NULL; \ + baskets[idx].cols = NULL; \ + baskets[idx].bats = NULL; \ + baskets[idx].ncols = 0; \ + baskets[idx].count = 0; \ + baskets[idx].window = 0; \ + baskets[idx].stride = STRIDE_ALL; \ + baskets[idx].seen = timestamp_nil; \ + baskets[idx].events = 0; \ + baskets[idx].error = NULL; + // locate the basket in the basket catalog int BSKTlocate(str sch, str tbl) @@ -73,6 +86,9 @@ BSKTnewEntry(void) return 0; bsktLimit += INTIAL_BSKT; baskets = bnew; + for (i = bsktLimit-INTIAL_BSKT; i < bsktLimit; i++){ + BSKTinit(i); + } } for (i = 1; i < bsktLimit; i++) { /* find an available slot */ if (baskets[i].table == NULL) @@ -103,13 +119,17 @@ BSKTclean(int idx) BBPunfix(baskets[idx].bats[i]->batCacheid); baskets[idx].bats[i] =NULL; } + baskets[i].ncols = 0; GDKfree(baskets[idx].bats); GDKfree(baskets[idx].cols); + baskets[idx].bats = NULL; + baskets[idx].cols = NULL; MT_lock_destroy(&baskets[idx].lock); } } // MAL/SQL interface for registration of a single table +// Create the internal basket structure before we're going to use this stream table str BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl, int* res) { @@ -146,6 +166,13 @@ BSKTregisterInternal(Client cntxt, MalBl if((idx = BSKTnewEntry()) < 1) throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL); + /* Since we just created a new basket entry to register this stream + * table, when anything goes wrong below, we reset this basket to its initial + * state. We destroy its lock here, because it's a new entry and no + * lock has been set (basket.register is called before basket.lock). */ + // FIXME: BSKTregisterInternal is also called by functions other than + // basket.register, which shouldn't destroy the lock. But + // they should have been detected as double registration. baskets[idx].table = t; baskets[idx].window = t->stream->window; baskets[idx].stride = t->stream->stride; @@ -158,6 +185,7 @@ BSKTregisterInternal(Client cntxt, MalBl int tpe = col->type.type->localtype; if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || tpe == TYPE_timestamp) ) { + BSKTinit(idx); MT_lock_destroy(&baskets[idx].lock); throw(MAL,"basket.register",SQLSTATE(42000) "Unsupported type %d\n",tpe); } @@ -166,13 +194,14 @@ BSKTregisterInternal(Client cntxt, MalBl baskets[idx].ncols = colcnt; baskets[idx].bats = GDKmalloc(colcnt * sizeof(BAT **)); if(baskets[idx].bats == NULL) { + BSKTinit(idx); MT_lock_destroy(&baskets[idx].lock); throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL); } baskets[idx].cols = GDKmalloc(colcnt * sizeof(sql_column **)); if(baskets[idx].cols == NULL) { + BSKTinit(idx); MT_lock_destroy(&baskets[idx].lock); - GDKfree(baskets[idx].bats); throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL); } @@ -234,6 +263,7 @@ BSKTwindow(Client cntxt, MalBlkPtr mb, M return MAL_SUCCEED; } +// keep and release should be used as a pair str BSKTkeep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -467,6 +497,7 @@ BSKTtumble(Client cntxt, MalBlkPtr mb, M return BSKTtumbleInternal(cntxt, sch, tbl, idx, elw, elm); } +// A no-op to handle an SQL COMMIT statement involving a stream table. str BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -516,7 +547,10 @@ BSKTunlock(Client cntxt, MalBlkPtr mb, M if( idx ==0) throw(SQL,"basket.lock",SQLSTATE(3F000) "Stream table %s.%s not accessible\n",sch,tbl); - /* this is also the place to administer the size of the basket */ + /* this is also the place to administer the size of the basket, + * i.e. set it to the count of remaining tuples for the next query on + * this stream table. + */ b = BSKTbindColumn(sch,tbl, baskets[idx].cols[0]->base.name); if( b) baskets[idx].count = BATcount(b); @@ -701,6 +735,7 @@ BSKTdelete(Client cntxt, MalBlkPtr mb, M return MAL_SUCCEED; } +// Reset a busket so that it can be used anew str BSKTreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -714,8 +749,9 @@ BSKTreset(Client cntxt, MalBlkPtr mb, Ma *res = 0; idx = BSKTlocate(sname,tname); - if( idx <= 0) + if( idx == 0) throw(SQL,"basket.reset",SQLSTATE(3F000) "Stream table %s.%s not registered\n",sname,tname); + // FIXME: should we really set and unset the backset lock here? // do actual work MT_lock_set(&baskets[idx].lock); for( i=0; i < baskets[idx].ncols; i++){ @@ -745,7 +781,7 @@ BSKTerror(Client cntxt, MalBlkPtr mb, Ma (void) mb; idx = BSKTlocate(sname,tname); - if( idx <= 0) + if( idx == 0) throw(SQL,"basket.error",SQLSTATE(3F000) "Stream table %s.%s not registered\n",sname,tname); if(error) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list