Changeset: 70f3e075a7bb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=70f3e075a7bb
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/bat/bat_table.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: Jul2015
Log Message:

fixed concurrency issues
1) keep tables aligned, when insert are followed by update transactions we
should apply all changes at once (ie when we are the only transaction) or none
(ie when we are not alone)
2) keep ibase set also in the optimized route (where we move i-nsert bat)
3) do not throw away delta's to early


diffs (truncated from 352 to 300 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -20,7 +20,7 @@ timestamp_delta( sql_delta *d, int ts)
 {
        while (d->next && d->wtime > ts) 
                d = d->next;
-       if (0 && d && d->cached) {
+       if (d && d->cached) {
                bat_destroy(d->cached);
                d->cached = NULL;
        }
@@ -32,7 +32,7 @@ timestamp_dbat( sql_dbat *d, int ts)
 {
        while (d->next && d->wtime > ts) 
                d = d->next;
-       if (0 && d && d->cached) {
+       if (d && d->cached) {
                bat_destroy(d->cached);
                d->cached = NULL;
        }
@@ -494,6 +494,7 @@ delta_append_bat( sql_delta *bat, BAT *i
                bat->ibid = id;
                temp_dup(id);
                bat_destroy(b);
+               i->hseqbase = bat->ibase;
        } else {
                if (!isEbat(b)){
                        assert(b->T->heap.storage != STORE_PRIV);
@@ -1871,7 +1872,7 @@ tr_update_delta( sql_trans *tr, sql_delt
                return ok;
        }
        ins = temp_descriptor(cbat->ibid);
-       if (unique)
+       if (0 && unique)
                BATkey(BATmirror(cur), TRUE);
        /* any inserts */
        if (BUNlast(ins) > BUNfirst(ins) || cleared) {
@@ -1879,7 +1880,7 @@ tr_update_delta( sql_trans *tr, sql_delt
                        /* swap cur and ins */
                        BAT *newcur = ins;
 
-                       if (unique)
+                       if (0 && unique)
                                BATkey(BATmirror(newcur), TRUE);
                        temp_destroy(cbat->bid);
                        temp_destroy(obat->bid);
@@ -1940,6 +1941,78 @@ tr_update_delta( sql_trans *tr, sql_delt
        return ok;
 }
 
+static int 
+tr_merge_delta( sql_trans *tr, sql_delta *obat, int unique)
+{
+       int ok = LOG_OK;
+       BAT *ins, *cur = NULL;
+       int cleared = 0;
+
+       (void)tr;
+       assert(store_nr_active==1);
+       assert (obat->bid != 0 || tr != gtrans);
+
+       if (obat->cached) {
+               bat_destroy(obat->cached);
+               obat->cached = NULL;
+       }
+       if (obat->bid)
+               cur = temp_descriptor(obat->bid);
+       ins = temp_descriptor(obat->ibid);
+       if (0 && unique)
+               BATkey(BATmirror(cur), TRUE);
+       /* any inserts */
+       if (BUNlast(ins) > BUNfirst(ins) || cleared) {
+               if ((!obat->ibase && BATcount(ins) > SNAPSHOT_MINSIZE)){
+                       /* swap cur and ins */
+                       BAT *newcur = ins;
+                       bat id = obat->bid;
+
+                       if (0 && unique)
+                               BATkey(BATmirror(newcur), TRUE);
+                       obat->bid = obat->ibid;
+                       obat->ibid = id;
+
+                       BATmsync(ins);
+                       ins = cur;
+                       cur = newcur;
+               } else {
+                       BATappend(cur,ins,TRUE);
+                       BATcleanProps(cur);
+                       if (cur->batPersistence == PERSISTENT)
+                               BATmsync(cur);
+               }
+               obat->cnt = obat->ibase = BATcount(cur);
+               temp_destroy(obat->ibid);
+               obat->ibid = e_bat(cur->ttype);
+       }
+       bat_destroy(ins);
+
+       if (obat->ucnt || cleared) {
+               BAT *ui = temp_descriptor(obat->uibid);
+               BAT *uv = temp_descriptor(obat->uvbid);
+
+               /* any updates */
+               if (BUNlast(ui) > BUNfirst(ui)) {
+                       void_replace_bat(cur, ui, uv, TRUE);
+                       /* cleanup the old deltas */
+                       temp_destroy(obat->uibid);
+                       temp_destroy(obat->uvbid);
+                       obat->uibid = e_bat(TYPE_oid);
+                       obat->uvbid = e_bat(cur->ttype);
+                       obat->ucnt = 0;
+               }
+               bat_destroy(ui);
+               bat_destroy(uv);
+       }
+       bat_destroy(cur);
+       if (obat->next) { 
+               ok = destroy_bat(tr, obat->next);
+               obat->next = NULL;
+       }
+       return ok;
+}
+
 static int
 tr_update_dbat(sql_trans *tr, sql_dbat *tdb, sql_dbat *fdb, int cleared)
 {
@@ -1983,6 +2056,23 @@ tr_update_dbat(sql_trans *tr, sql_dbat *
 }
 
 static int
+tr_merge_dbat(sql_trans *tr, sql_dbat *tdb)
+{
+       int ok = LOG_OK;
+
+       if (tdb->cached) {
+               bat_destroy(tdb->cached);
+               tdb->cached = NULL;
+       }
+       assert(store_nr_active==1);
+       if (tdb->next) {
+               ok = destroy_dbat(tr, tdb->next);
+               tdb->next = NULL;
+       }
+       return ok;
+}
+
+static int
 update_table(sql_trans *tr, sql_table *ft, sql_table *tt)
 {
        sql_trans *oldest = active_transactions->h->data;
@@ -2006,8 +2096,8 @@ update_table(sql_trans *tr, sql_table *f
                }
        }
 
-       if (ft->base.allocated) {
-               if (store_nr_active > 1) { /* move delta */
+       if (store_nr_active == 1 || ft->base.allocated) {
+               if (store_nr_active > 1 && ft->data) { /* move delta */
                        sql_dbat *b = ft->data;
 
                        ft->data = NULL;
@@ -2020,14 +2110,16 @@ update_table(sql_trans *tr, sql_table *f
                        }
                        while (b && b->wtime >= oldest->stime)
                                b = b->next;
-                       if (b && b->wtime < oldest->stime) {
+                       if (0 && b && b->wtime < oldest->stime) {
                                /* anything older can go */
                                destroy_dbat(tr, b->next);
                                b->next = NULL;
                        }
-               } else if (tt->base.allocated) {
+               } else if (ft->base.allocated) {
                        tr_update_dbat(tr, tt->data, ft->data, ft->cleared);
-               } else {
+               } else if (store_nr_active == 1 && !ft->base.allocated) {
+                       tr_merge_dbat(tr, tt->data);
+               } else if (ft->data) {
                        tt->data = ft->data;
                        tt->base.allocated = 1;
                }
@@ -2036,28 +2128,31 @@ update_table(sql_trans *tr, sql_table *f
                sql_column *cc = n->data;
                sql_column *oc = m->data;
 
-               if (cc->base.wtime && cc->base.allocated) {
-                       assert(oc->base.wtime < cc->base.wtime);
-                       if (store_nr_active > 1) { /* move delta */
+               if (store_nr_active == 1 || (cc->base.wtime && 
cc->base.allocated)) {
+                       assert(!cc->base.wtime || oc->base.wtime < 
cc->base.wtime);
+                       if (store_nr_active > 1 && cc->data) { /* move delta */
                                sql_delta *b = cc->data;
 
                                cc->data = NULL;
                                b->next = oc->data;
                                oc->data = b;
+
                                if (b->cached) {
                                        bat_destroy(b->cached);
                                        b->cached = NULL;
                                }
                                while (b && b->wtime >= oldest->stime) 
                                        b = b->next;
-                               if (b && b->wtime < oldest->stime) {
+                               if (0 && b && b->wtime < oldest->stime) {
                                        /* anything older can go */
                                        destroy_bat(tr, b->next);
                                        b->next = NULL;
                                }
-                       } else if (oc->base.allocated) {
+                       } else if (cc->base.allocated) {
                                tr_update_delta(tr, oc->data, cc->data, 
cc->unique == 1);
-                       } else {
+                       } else if (store_nr_active == 1 && !cc->base.allocated) 
{
+                               tr_merge_delta(tr, oc->data, oc->unique == 1);
+                       } else if (cc->data) {
                                oc->data = cc->data; 
                                oc->base.allocated = 1;
                        }
@@ -2088,34 +2183,37 @@ update_table(sql_trans *tr, sql_table *f
                        sql_idx *oi = m->data;
 
                        /* some indices have no bats */
-                       if (!oi->data || !ci->base.wtime || 
!ci->base.allocated) {
+                       if (!oi->data) {
                                ci->data = NULL;
                                ci->base.allocated = ci->base.rtime = 
ci->base.wtime = 0;
                                continue;
                        }
-                       if (store_nr_active > 1) { /* move delta */
-                               sql_delta *b = ci->data;
+                       if (store_nr_active == 1 || (ci->base.wtime && 
ci->base.allocated)) {
+                               if (store_nr_active > 1 && ci->data) { /* move 
delta */
+                                       sql_delta *b = ci->data;
 
-                               ci->data = NULL;
-                               b->next = oi->data;
-                               oi->data = b;
-                               if (b->cached) {
-                                       bat_destroy(b->cached);
-                                       b->cached = NULL;
+                                       ci->data = NULL;
+                                       b->next = oi->data;
+                                       oi->data = b;
+                                       if (b->cached) {
+                                               bat_destroy(b->cached);
+                                               b->cached = NULL;
+                                       }
+                                       while (b && b->wtime >= oldest->stime) 
+                                               b = b->next;
+                                       if (0 && b && b->wtime < oldest->stime) 
{
+                                               /* anything older can go */
+                                               destroy_bat(tr, b->next);
+                                               b->next = NULL;
+                                       }
+                               } else if (ci->base.allocated) {
+                                       tr_update_delta(tr, oi->data, ci->data, 
0);
+                               } else if (store_nr_active == 1 && 
!ci->base.allocated) {
+                                       tr_merge_delta(tr, oi->data, 0);
+                               } else if (ci->data) {
+                                       oi->data = ci->data;
+                                       oi->base.allocated = 1;
                                }
-                               while (b && b->wtime >= oldest->stime) 
-                                       b = b->next;
-                               if (b && b->wtime < oldest->stime) {
-                                       /* anything older can go */
-                                       destroy_bat(tr, b->next);
-                                       b->next = NULL;
-                               }
-                       } else if (oi->base.allocated) {
-                               assert(oi->base.allocated);
-                               tr_update_delta(tr, oi->data, ci->data, 0);
-                       } else {
-                               oi->data = ci->data;
-                               oi->base.allocated = 1;
                        }
 
                        if (oi->base.rtime < ci->base.rtime)
diff --git a/sql/storage/bat/bat_table.c b/sql/storage/bat/bat_table.c
--- a/sql/storage/bat/bat_table.c
+++ b/sql/storage/bat/bat_table.c
@@ -53,10 +53,10 @@ delta_cands(sql_trans *tr, sql_table *t)
                t->data = timestamp_dbat(ot->data, tr->stime);
        }
        d = t->data;
-       if (d->cached /*&& !tr->parent*/) 
+       if (!store_initialized && d->cached) 
                return temp_descriptor(d->cached->batCacheid);
        tids = _delta_cands(tr, t);
-       if (!d->cached /*&& !tr->parent*/) /* only cache during catalog loading 
*/
+       if (!store_initialized && !d->cached) /* only cache during catalog 
loading */
                d->cached = temp_descriptor(tids->batCacheid);
        return tids;
 }
@@ -104,7 +104,7 @@ delta_full_bat_( sql_trans *tr, sql_colu
                bat_destroy(uv); 
        }
        (void)c;
-       if (!bat->cached /*&& !tr->parent*/) 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to