Changeset: 90ed947e972f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=90ed947e972f
Modified Files:
        sql/storage/bat/bat_storage.c
Branch: newstorage
Log Message:

when claiming buns, lock BAT and extend it if needed


diffs (249 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -620,22 +620,35 @@ dup_timestamps(sql_trans *tr, sql_table 
 static void
 timestamps_insert(sql_trans *tr, sql_timestamps *ts, wrd cnt) {
 
-       BAT *b = temp_descriptor(ts->insbid);
+       BAT *insb = temp_descriptor(ts->insbid);
+       BAT *delb = temp_descriptor(ts->delbid);
+
        int i;
 
-       /*if (isEbat(b)) {
+       if (isEbat(insb)) {
                 temp_destroy(ts->insbid);
-                ts->insbid = temp_copy(b->batCacheid, FALSE);
-                bat_destroy(b);
-                b = temp_descriptor(ts->insbid);
-        }**/
-
-       assert(b->T->heap.storage != STORE_PRIV);
-
-       for(i=0; i<cnt; i++)
-               BUNappend(b, (ptr)&(tr->wtime), TRUE); /* should append 
negative values */
-
-       bat_destroy(b);
+                ts->insbid = temp_copy(insb->batCacheid, FALSE);
+                bat_destroy(insb);
+                insb = temp_descriptor(ts->insbid);
+        }
+
+       if (isEbat(delb)) {
+               temp_destroy(ts->delbid);
+               ts->delbid = temp_copy(delb->batCacheid, FALSE);
+               bat_destroy(delb);
+               delb = temp_descriptor(ts->delbid);
+       }
+
+       assert(insb->T->heap.storage != STORE_PRIV);
+       assert(delb->T->heap.storage != STORE_PRIV);
+
+       for(i=0; i<cnt; i++) {
+               BUNinplace(insb, BATcount(insb) , insb->H, (ptr)&(tr->wtime), 
TRUE); /* tr->wtime should be negative */
+               //BUNinplace(delb, BATcount(delb), delb->H, (ptr)&(tr->wtime), 
TRUE); /* should be nil */
+       }
+
+       bat_destroy(insb);
+       bat_destroy(delb);
        ts->cnt += cnt;
 }
 
@@ -645,6 +658,11 @@ claim_tab(sql_trans *tr, sql_table *t, w
        sql_timestamps *ts;
        sql_dbat *bat;
 
+       node *n;
+       int extFlag=0;
+
+       store_lock();
+
        if (!t->data || !t->base.allocated) {
                 sql_table *ot = tr_find_table(tr->parent, t);
                 sql_dbat *bat = t->data = ZNEW(sql_dbat), *obat = 
timestamp_dbat(ot->data, tr->stime);
@@ -662,9 +680,106 @@ claim_tab(sql_trans *tr, sql_table *t, w
         }
        ts = t->timestamps;
 
+       /* claim # of column buns */
+       for (n = t->columns.set->h; n; n = n->next) {
+               sql_column *c = n->data;
+               sql_delta *bat;
+               BAT *cmbat;
+
+                if (!c->data) {
+                        sql_column *oc = tr_find_column(tr->parent, c);
+                        c->data = timestamp_delta(oc->data, tr->stime);
+                }
+               bat = c->data;
+               cmbat = temp_descriptor(bat->bid);
+
+               /* grow column if needed*/
+               if(cmbat) {
+                       if ((BUN)cnt > (BATcapacity(cmbat) - BUNlast(cmbat))) {
+                               BUN newCap = BUNlast(cmbat) + (BUN)cnt;
+                               BUN grows = BATgrows(cmbat);
+
+                               extFlag=1;
+
+                               if(newCap > grows)
+                                       grows = newCap;
+                               if(BATextend(cmbat, grows) == NULL)
+                                       return;
+                       }
+                       BATsetcount(cmbat, (BATcount(cmbat) + (BUN)cnt));
+                       bat->ibase+=(BUN)cnt;
+               }
+               temp_destroy(bat->bid);
+       }
+
+       /* claim # of idx buns */
+       if (t->idxs.set) {
+                for (n = t->idxs.set->h; n; n = n->next) {
+                        sql_idx *i = n->data;
+                        sql_delta *bat;
+                       BAT *idxb;
+
+                        if (!i->data) {
+                                sql_idx *oi = tr_find_idx(tr->parent, i);
+                                i->data = timestamp_delta(oi->data, tr->stime);
+                        }
+                        bat = i->data;
+                       idxb = temp_descriptor(bat->bid);
+
+                       /* grow index if needed */
+                       if((BUN)cnt > (BATcapacity(idxb) - BUNlast(idxb))) {
+                               BUN newCap = BUNlast(idxb) + (BUN)cnt;
+                               BUN grows = BATgrows(idxb);
+
+                               if (newCap > grows)
+                                       grows = newCap;
+                               if (BATextend(idxb, grows) == NULL)
+                                       return;
+                       }
+                       BATsetcount(idxb, (BATcount(idxb) + (BUN)cnt));
+                       bat->ibase+=(BUN)cnt;
+                       temp_destroy(bat->bid);
+                }
+        }
+
+       /* grow timestamp-bats if columns have grown */
+       if (extFlag==1) {
+               BAT *insb = temp_descriptor(ts->insbid);
+               BAT *delb = temp_descriptor(ts->delbid);
+
+               /* grow insbat */
+               if((BUN)cnt > (BATcapacity(insb) - BUNlast(insb))) {
+                       BUN newCap = BUNlast(insb) + (BUN)cnt;
+                        BUN grows = BATgrows(insb);
+
+                        if (newCap > grows)
+                               grows = newCap;
+                        if (BATextend(insb, grows) == NULL)
+                                return;
+                }
+               BATsetcount(insb, (BATcount(insb) + (BUN)cnt));
+
+               /* grow delbat */
+                if((BUN)cnt > (BATcapacity(delb) - BUNlast(delb))) {
+                        BUN newCap = BUNlast(delb) + (BUN)cnt;
+                        BUN grows = BATgrows(delb);
+
+                        if (newCap > grows)
+                                grows = newCap;
+                        if (BATextend(delb, grows) == NULL)
+                                return;
+                }
+               BATsetcount(delb, (BATcount(delb) + (BUN)cnt));
+
+               temp_destroy(ts->insbid);
+               temp_destroy(ts->delbid);
+       }
+
        ts->wtime = tr->wtime = tr->wstime;
 
        timestamps_insert(tr, ts, cnt);
+
+       store_unlock();
 }
 
 static void 
@@ -725,7 +840,16 @@ append_idx(sql_trans *tr, sql_idx * i, v
 }
 
 static void
-delta_delete_bat (sql_dbat *bat, BAT *i)
+timestamps_delete(sql_trans *tr, sql_timestamps *ts, BAT *i) {
+
+       (void)tr;
+       (void)ts;
+       (void)i;
+
+}
+
+static void
+delta_delete_bat(sql_trans *tr, sql_timestamps *ts, sql_dbat *bat, BAT *i)
 {
        BAT *b = temp_descriptor(bat->dbid);
 
@@ -742,14 +866,18 @@ delta_delete_bat (sql_dbat *bat, BAT *i)
        bat_destroy(b);
 
        bat->cnt += BATcount(i);
+
+       timestamps_delete(tr, ts, i);
 }
 
 static void
-delta_delete_val(sql_dbat *bat, oid rid)
+delta_delete_val(sql_trans *tr, sql_timestamps *ts, sql_dbat *bat, oid rid)
 {
        BAT *b = temp_descriptor(bat->dbid);
 
        printf("#in delta_delete_val\n");
+       (void)tr;
+       (void)ts;
 
        if (isEbat(b)) {
                temp_destroy(bat->dbid);
@@ -765,7 +893,7 @@ delta_delete_val(sql_dbat *bat, oid rid)
 }
 
 static void
-delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
+delete_tab(sql_trans *tr, sql_table *t, void *ib, int tpe)
 {
        BAT *b = ib;
        sql_dbat *bat;
@@ -792,7 +920,6 @@ delete_tab(sql_trans *tr, sql_table * t,
                 t->base.allocated = 1;
         }
        ts = t->timestamps;
-       (void)ts;
 
        /* delete all cached copies */
        for (n = t->columns.set->h; n; n = n->next) {
@@ -829,9 +956,9 @@ delete_tab(sql_trans *tr, sql_table * t,
        /* deletes only write */
        bat->wtime = t->base.wtime = t->s->base.wtime = tr->wtime = tr->wstime;
        if (tpe == TYPE_bat)
-               delta_delete_bat(bat, ib);
+               delta_delete_bat(tr, ts, bat, ib);
        else
-               delta_delete_val(bat, *(oid*)ib);
+               delta_delete_val(tr, ts, bat, *(oid*)ib);
 }
 
 static size_t
@@ -2117,8 +2244,8 @@ tr_update_delta( sql_trans *tr, sql_delt
                        if (BATcount(cur)+BATcount(ins) > (BUN) 
REMAP_PAGE_MAXSIZE) /* try to use mmap() */
                                        BATmmap(cur, STORE_MMAP, STORE_MMAP, 
STORE_MMAP, STORE_MMAP, 1);
                        assert(cur->T->heap.storage != STORE_PRIV);
-                       assert((BATcount(cur) + BATcount(ins)) == cbat->cnt);
-                       assert((BATcount(cur) + BATcount(ins)) == (obat->cnt + 
(BUNlast(ins) - ins->batInserted)));
+                       //assert((BATcount(cur) + BATcount(ins)) == cbat->cnt);
+                       //assert((BATcount(cur) + BATcount(ins)) == (obat->cnt 
+ (BUNlast(ins) - ins->batInserted)));
                        BATappend(cur,ins,TRUE);
                        BATcleanProps(cur);
                        temp_destroy(cbat->bid);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to