Changeset: 720479de31fe for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=720479de31fe
Modified Files:
        sql/backends/monet5/sql_basket.c
Branch: trails
Log Message:

More clean up of the basket structure and some information for the code


diffs (145 lines):

diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -38,6 +38,19 @@
 BasketRec *baskets;   /* the global timetrails catalog */
 static int bsktTop = 0, bsktLimit = 0;
 
+// Initialise a basket, but leave the caller to deal with the lock
+#define BSKTinit(idx)                       \
+       baskets[idx].table = NULL;          \
+       baskets[idx].cols = NULL;           \
+       baskets[idx].bats = NULL;           \
+       baskets[idx].ncols = 0;             \
+       baskets[idx].count = 0;             \
+       baskets[idx].window = 0;            \
+       baskets[idx].stride = STRIDE_ALL;   \
+       baskets[idx].seen = timestamp_nil;  \
+       baskets[idx].events = 0;            \
+       baskets[idx].error = NULL;
+
 // locate the basket in the basket catalog
 int
 BSKTlocate(str sch, str tbl)
@@ -73,6 +86,9 @@ BSKTnewEntry(void)
                        return 0;
                bsktLimit += INTIAL_BSKT;
                baskets = bnew;
+               for (i = bsktLimit-INTIAL_BSKT; i < bsktLimit; i++){
+                       BSKTinit(i);
+               }
        }
        for (i = 1; i < bsktLimit; i++) { /* find an available slot */
                if (baskets[i].table == NULL)
@@ -103,13 +119,17 @@ BSKTclean(int idx)
                        BBPunfix(baskets[idx].bats[i]->batCacheid);
                        baskets[idx].bats[i] =NULL;
                }
+               baskets[i].ncols = 0;
                GDKfree(baskets[idx].bats);
                GDKfree(baskets[idx].cols);
+               baskets[idx].bats = NULL;
+               baskets[idx].cols = NULL;
                MT_lock_destroy(&baskets[idx].lock);
        }
 }
 
 // MAL/SQL interface for registration of a single table
+// Create the internal basket structure before we're going to use this stream 
table
 str
 BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl, int* res)
 {
@@ -146,6 +166,13 @@ BSKTregisterInternal(Client cntxt, MalBl
        if((idx = BSKTnewEntry()) < 1)
                throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL);
 
+       /* Since we just created a new basket entry to register this stream
+        * table, when anything goes wrong below, we reset this basket to its 
initial
+        * state. We destroy its lock here, because it's a new entry and no
+        * lock has been set (basket.register is called before basket.lock). */
+       // FIXME: BSKTregisterInternal is also called by functions other than
+       //          basket.register, which shouldn't destroy the lock.  But
+       //          they should have been detected as double registration.
        baskets[idx].table = t;
        baskets[idx].window = t->stream->window;
        baskets[idx].stride = t->stream->stride;
@@ -158,6 +185,7 @@ BSKTregisterInternal(Client cntxt, MalBl
                int tpe = col->type.type->localtype;
 
                if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == 
TYPE_daytime || tpe == TYPE_timestamp) ) {
+                       BSKTinit(idx);
                        MT_lock_destroy(&baskets[idx].lock);
                        throw(MAL,"basket.register",SQLSTATE(42000) 
"Unsupported type %d\n",tpe);
                }
@@ -166,13 +194,14 @@ BSKTregisterInternal(Client cntxt, MalBl
        baskets[idx].ncols = colcnt;
        baskets[idx].bats = GDKmalloc(colcnt * sizeof(BAT **));
        if(baskets[idx].bats == NULL) {
+               BSKTinit(idx);
                MT_lock_destroy(&baskets[idx].lock);
                throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
        baskets[idx].cols = GDKmalloc(colcnt * sizeof(sql_column **));
        if(baskets[idx].cols == NULL) {
+               BSKTinit(idx);
                MT_lock_destroy(&baskets[idx].lock);
-               GDKfree(baskets[idx].bats);
                throw(MAL,"basket.register",SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
 
@@ -234,6 +263,7 @@ BSKTwindow(Client cntxt, MalBlkPtr mb, M
        return MAL_SUCCEED;
 }
 
+// keep and release should be used as a pair
 str
 BSKTkeep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -467,6 +497,7 @@ BSKTtumble(Client cntxt, MalBlkPtr mb, M
        return BSKTtumbleInternal(cntxt, sch, tbl, idx, elw, elm);
 }
 
+// A no-op to handle an SQL COMMIT statement involving a stream table.
 str
 BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -516,7 +547,10 @@ BSKTunlock(Client cntxt, MalBlkPtr mb, M
        if( idx ==0)
                throw(SQL,"basket.lock",SQLSTATE(3F000) "Stream table %s.%s not 
accessible\n",sch,tbl);
 
-       /* this is also the place to administer the size of the basket */
+       /* this is also the place to administer the size of the basket,
+        *   i.e. set it to the count of remaining tuples for the next query on
+        *   this stream table.
+        */
        b = BSKTbindColumn(sch,tbl, baskets[idx].cols[0]->base.name);
        if( b)
                baskets[idx].count = BATcount(b);
@@ -701,6 +735,7 @@ BSKTdelete(Client cntxt, MalBlkPtr mb, M
        return MAL_SUCCEED;
 }
 
+// Reset a busket so that it can be used anew
 str
 BSKTreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -714,8 +749,9 @@ BSKTreset(Client cntxt, MalBlkPtr mb, Ma
 
        *res = 0;
        idx = BSKTlocate(sname,tname);
-       if( idx <= 0)
+       if( idx == 0)
                throw(SQL,"basket.reset",SQLSTATE(3F000) "Stream table %s.%s 
not registered\n",sname,tname);
+       // FIXME: should we really set and unset the backset lock here?
        // do actual work
        MT_lock_set(&baskets[idx].lock);
        for( i=0; i < baskets[idx].ncols; i++){
@@ -745,7 +781,7 @@ BSKTerror(Client cntxt, MalBlkPtr mb, Ma
        (void) mb;
 
        idx = BSKTlocate(sname,tname);
-       if( idx <= 0)
+       if( idx == 0)
                throw(SQL,"basket.error",SQLSTATE(3F000) "Stream table %s.%s 
not registered\n",sname,tname);
 
        if(error) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to