Changeset: 70f3e075a7bb for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=70f3e075a7bb Modified Files: sql/storage/bat/bat_storage.c sql/storage/bat/bat_table.c sql/storage/sql_storage.h sql/storage/store.c Branch: Jul2015 Log Message:
fixed concurrency issues 1) keep tables aligned, when insert are followed by update transactions we should apply all changes at once (ie when we are the only transaction) or none (ie when we are not alone) 2) keep ibase set also in the optimized route (where we move i-nsert bat) 3) do not throw away delta's to early diffs (truncated from 352 to 300 lines): diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -20,7 +20,7 @@ timestamp_delta( sql_delta *d, int ts) { while (d->next && d->wtime > ts) d = d->next; - if (0 && d && d->cached) { + if (d && d->cached) { bat_destroy(d->cached); d->cached = NULL; } @@ -32,7 +32,7 @@ timestamp_dbat( sql_dbat *d, int ts) { while (d->next && d->wtime > ts) d = d->next; - if (0 && d && d->cached) { + if (d && d->cached) { bat_destroy(d->cached); d->cached = NULL; } @@ -494,6 +494,7 @@ delta_append_bat( sql_delta *bat, BAT *i bat->ibid = id; temp_dup(id); bat_destroy(b); + i->hseqbase = bat->ibase; } else { if (!isEbat(b)){ assert(b->T->heap.storage != STORE_PRIV); @@ -1871,7 +1872,7 @@ tr_update_delta( sql_trans *tr, sql_delt return ok; } ins = temp_descriptor(cbat->ibid); - if (unique) + if (0 && unique) BATkey(BATmirror(cur), TRUE); /* any inserts */ if (BUNlast(ins) > BUNfirst(ins) || cleared) { @@ -1879,7 +1880,7 @@ tr_update_delta( sql_trans *tr, sql_delt /* swap cur and ins */ BAT *newcur = ins; - if (unique) + if (0 && unique) BATkey(BATmirror(newcur), TRUE); temp_destroy(cbat->bid); temp_destroy(obat->bid); @@ -1940,6 +1941,78 @@ tr_update_delta( sql_trans *tr, sql_delt return ok; } +static int +tr_merge_delta( sql_trans *tr, sql_delta *obat, int unique) +{ + int ok = LOG_OK; + BAT *ins, *cur = NULL; + int cleared = 0; + + (void)tr; + assert(store_nr_active==1); + assert (obat->bid != 0 || tr != gtrans); + + if (obat->cached) { + bat_destroy(obat->cached); + obat->cached = NULL; + } + if (obat->bid) + cur = temp_descriptor(obat->bid); + ins = temp_descriptor(obat->ibid); + if (0 && unique) + BATkey(BATmirror(cur), TRUE); + /* any inserts */ + if (BUNlast(ins) > BUNfirst(ins) || cleared) { + if ((!obat->ibase && BATcount(ins) > SNAPSHOT_MINSIZE)){ + /* swap cur and ins */ + BAT *newcur = ins; + bat id = obat->bid; + + if (0 && unique) + BATkey(BATmirror(newcur), TRUE); + obat->bid = obat->ibid; + obat->ibid = id; + + BATmsync(ins); + ins = cur; + cur = newcur; + } else { + BATappend(cur,ins,TRUE); + BATcleanProps(cur); + if (cur->batPersistence == PERSISTENT) + BATmsync(cur); + } + obat->cnt = obat->ibase = BATcount(cur); + temp_destroy(obat->ibid); + obat->ibid = e_bat(cur->ttype); + } + bat_destroy(ins); + + if (obat->ucnt || cleared) { + BAT *ui = temp_descriptor(obat->uibid); + BAT *uv = temp_descriptor(obat->uvbid); + + /* any updates */ + if (BUNlast(ui) > BUNfirst(ui)) { + void_replace_bat(cur, ui, uv, TRUE); + /* cleanup the old deltas */ + temp_destroy(obat->uibid); + temp_destroy(obat->uvbid); + obat->uibid = e_bat(TYPE_oid); + obat->uvbid = e_bat(cur->ttype); + obat->ucnt = 0; + } + bat_destroy(ui); + bat_destroy(uv); + } + bat_destroy(cur); + if (obat->next) { + ok = destroy_bat(tr, obat->next); + obat->next = NULL; + } + return ok; +} + static int tr_update_dbat(sql_trans *tr, sql_dbat *tdb, sql_dbat *fdb, int cleared) { @@ -1983,6 +2056,23 @@ tr_update_dbat(sql_trans *tr, sql_dbat * } static int +tr_merge_dbat(sql_trans *tr, sql_dbat *tdb) +{ + int ok = LOG_OK; + + if (tdb->cached) { + bat_destroy(tdb->cached); + tdb->cached = NULL; + } + assert(store_nr_active==1); + if (tdb->next) { + ok = destroy_dbat(tr, tdb->next); + tdb->next = NULL; + } + return ok; +} + +static int update_table(sql_trans *tr, sql_table *ft, sql_table *tt) { sql_trans *oldest = active_transactions->h->data; @@ -2006,8 +2096,8 @@ update_table(sql_trans *tr, sql_table *f } } - if (ft->base.allocated) { - if (store_nr_active > 1) { /* move delta */ + if (store_nr_active == 1 || ft->base.allocated) { + if (store_nr_active > 1 && ft->data) { /* move delta */ sql_dbat *b = ft->data; ft->data = NULL; @@ -2020,14 +2110,16 @@ update_table(sql_trans *tr, sql_table *f } while (b && b->wtime >= oldest->stime) b = b->next; - if (b && b->wtime < oldest->stime) { + if (0 && b && b->wtime < oldest->stime) { /* anything older can go */ destroy_dbat(tr, b->next); b->next = NULL; } - } else if (tt->base.allocated) { + } else if (ft->base.allocated) { tr_update_dbat(tr, tt->data, ft->data, ft->cleared); - } else { + } else if (store_nr_active == 1 && !ft->base.allocated) { + tr_merge_dbat(tr, tt->data); + } else if (ft->data) { tt->data = ft->data; tt->base.allocated = 1; } @@ -2036,28 +2128,31 @@ update_table(sql_trans *tr, sql_table *f sql_column *cc = n->data; sql_column *oc = m->data; - if (cc->base.wtime && cc->base.allocated) { - assert(oc->base.wtime < cc->base.wtime); - if (store_nr_active > 1) { /* move delta */ + if (store_nr_active == 1 || (cc->base.wtime && cc->base.allocated)) { + assert(!cc->base.wtime || oc->base.wtime < cc->base.wtime); + if (store_nr_active > 1 && cc->data) { /* move delta */ sql_delta *b = cc->data; cc->data = NULL; b->next = oc->data; oc->data = b; + if (b->cached) { bat_destroy(b->cached); b->cached = NULL; } while (b && b->wtime >= oldest->stime) b = b->next; - if (b && b->wtime < oldest->stime) { + if (0 && b && b->wtime < oldest->stime) { /* anything older can go */ destroy_bat(tr, b->next); b->next = NULL; } - } else if (oc->base.allocated) { + } else if (cc->base.allocated) { tr_update_delta(tr, oc->data, cc->data, cc->unique == 1); - } else { + } else if (store_nr_active == 1 && !cc->base.allocated) { + tr_merge_delta(tr, oc->data, oc->unique == 1); + } else if (cc->data) { oc->data = cc->data; oc->base.allocated = 1; } @@ -2088,34 +2183,37 @@ update_table(sql_trans *tr, sql_table *f sql_idx *oi = m->data; /* some indices have no bats */ - if (!oi->data || !ci->base.wtime || !ci->base.allocated) { + if (!oi->data) { ci->data = NULL; ci->base.allocated = ci->base.rtime = ci->base.wtime = 0; continue; } - if (store_nr_active > 1) { /* move delta */ - sql_delta *b = ci->data; + if (store_nr_active == 1 || (ci->base.wtime && ci->base.allocated)) { + if (store_nr_active > 1 && ci->data) { /* move delta */ + sql_delta *b = ci->data; - ci->data = NULL; - b->next = oi->data; - oi->data = b; - if (b->cached) { - bat_destroy(b->cached); - b->cached = NULL; + ci->data = NULL; + b->next = oi->data; + oi->data = b; + if (b->cached) { + bat_destroy(b->cached); + b->cached = NULL; + } + while (b && b->wtime >= oldest->stime) + b = b->next; + if (0 && b && b->wtime < oldest->stime) { + /* anything older can go */ + destroy_bat(tr, b->next); + b->next = NULL; + } + } else if (ci->base.allocated) { + tr_update_delta(tr, oi->data, ci->data, 0); + } else if (store_nr_active == 1 && !ci->base.allocated) { + tr_merge_delta(tr, oi->data, 0); + } else if (ci->data) { + oi->data = ci->data; + oi->base.allocated = 1; } - while (b && b->wtime >= oldest->stime) - b = b->next; - if (b && b->wtime < oldest->stime) { - /* anything older can go */ - destroy_bat(tr, b->next); - b->next = NULL; - } - } else if (oi->base.allocated) { - assert(oi->base.allocated); - tr_update_delta(tr, oi->data, ci->data, 0); - } else { - oi->data = ci->data; - oi->base.allocated = 1; } if (oi->base.rtime < ci->base.rtime) diff --git a/sql/storage/bat/bat_table.c b/sql/storage/bat/bat_table.c --- a/sql/storage/bat/bat_table.c +++ b/sql/storage/bat/bat_table.c @@ -53,10 +53,10 @@ delta_cands(sql_trans *tr, sql_table *t) t->data = timestamp_dbat(ot->data, tr->stime); } d = t->data; - if (d->cached /*&& !tr->parent*/) + if (!store_initialized && d->cached) return temp_descriptor(d->cached->batCacheid); tids = _delta_cands(tr, t); - if (!d->cached /*&& !tr->parent*/) /* only cache during catalog loading */ + if (!store_initialized && !d->cached) /* only cache during catalog loading */ d->cached = temp_descriptor(tids->batCacheid); return tids; } @@ -104,7 +104,7 @@ delta_full_bat_( sql_trans *tr, sql_colu bat_destroy(uv); } (void)c; - if (!bat->cached /*&& !tr->parent*/) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list