Changeset: 1845207c2511 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/1845207c2511
Modified Files:
        sql/storage/store.c
Branch: iso
Log Message:

merged with Jul2021


diffs (truncated from 892 to 300 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -71,8 +71,18 @@ unlock_table(sqlstore *store, sqlid id)
        MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
 }
 
-#define lock_column(store, id) lock_table(store, id)
-#define unlock_column(store, id) unlock_table(store, id)
+static void
+lock_column(sqlstore *store, sqlid id)
+{
+       MT_lock_set(&store->column_locks[id&(NR_TABLE_LOCKS-1)]);
+}
+
+static void
+unlock_column(sqlstore *store, sqlid id)
+{
+       MT_lock_unset(&store->column_locks[id&(NR_TABLE_LOCKS-1)]);
+}
+
 
 static int
 tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
@@ -218,7 +228,7 @@ rollback_segments(segments *segs, sql_tr
                                seg->next = cur->next;
                                if (cur == segs->t)
                                        segs->t = seg;
-                               mark4destroy(cur, change, oldest/* TODO somehow 
get current timestamp*/);
+                               mark4destroy(cur, change, 
store_get_timestamp(tr->store));
                                cur = seg;
                        } else {
                                seg = cur; /* begin of new merge */
@@ -618,6 +628,7 @@ cs_bind_ubat( column_storage *cs, int ac
        BAT *b;
 
        assert(access == RD_UPD_ID || access == RD_UPD_VAL);
+       /* returns the updates for cs */
        if (cs->uibid && cs->uvbid) {
                if (access == RD_UPD_ID)
                        b = temp_descriptor(cs->uibid);
@@ -630,20 +641,145 @@ cs_bind_ubat( column_storage *cs, int ac
 }
 
 static BAT *
-bind_ucol(sql_trans *tr, sql_column *c, int access)
+merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
+{
+       int err = 0;
+       BAT *uv = *UV;
+       BUN cnt = BATcount(ui)+BATcount(oi);
+       BAT *ni = bat_new(TYPE_oid, cnt, PERSISTENT);
+       BAT *nv = uv?bat_new(uv->ttype, cnt, PERSISTENT):NULL;
+
+       if (!ni || (uv && !nv)) {
+               bat_destroy(ni);
+               bat_destroy(nv);
+               bat_destroy(ui);
+               bat_destroy(uv);
+               bat_destroy(oi);
+               bat_destroy(ov);
+               return NULL;
+       }
+       BATiter uvi;
+       BATiter ovi;
+
+       if (uv) {
+               uvi = bat_iterator(uv);
+               ovi = bat_iterator(ov);
+       }
+
+       /* handle dense (void) cases together as we need too merge updates 
(which is slower anyway) */
+       BUN uip = 0, uie = BATcount(ui);
+       BUN oip = 0, oie = BATcount(oi);
+
+       oid uiseqb = ui->tseqbase;
+       oid oiseqb = oi->tseqbase;
+       oid *uipt = NULL, *oipt = NULL;
+       if (!BATtdense(ui))
+               uipt = Tloc(ui, 0);
+       if (!BATtdense(oi))
+               oipt = Tloc(oi, 0);
+       while (uip < uie && oip < oie && !err) {
+               oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
+               oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
+
+               if (uiid <= oiid) {
+                       if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
+                           (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) 
!= GDK_SUCCEED))
+                               err = 1;
+                       uip++;
+                       if (uiid == oiid)
+                               oip++;
+               } else { /* uiid > oiid */
+                       if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
+                           (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) 
!= GDK_SUCCEED) )
+                               err = 1;
+                       oip++;
+               }
+       }
+       while (uip < uie && !err) {
+               oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
+               if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
+                   (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != 
GDK_SUCCEED))
+                       err = 1;
+               uip++;
+       }
+       while (oip < oie && !err) {
+               oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
+               if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
+                   (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != 
GDK_SUCCEED) )
+                       err = 1;
+               oip++;
+       }
+       bat_destroy(ui);
+       bat_destroy(uv);
+       bat_destroy(oi);
+       bat_destroy(ov);
+       if (!err) {
+               if (nv)
+                       *UV = nv;
+               return ni;
+       }
+       *UV = NULL;
+       bat_destroy(ni);
+       bat_destroy(nv);
+       return NULL;
+}
+
+static sql_delta *
+older_delta( sql_delta *d, sql_trans *tr)
+{
+       sql_delta *o = d->next;
+
+       while (o) {
+               if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
+                       break;
+               else
+                       o = o->next;
+       }
+       if (o && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
+               return o;
+       return NULL;
+}
+
+static BAT *
+bind_ubat(sql_trans *tr, sql_delta *d, int access, int type)
 {
        assert(tr->active);
-       sql_delta *d = col_timestamp_delta(tr, c);
-       return cs_bind_ubat(&d->cs, access, c->type.type->localtype);
+       sql_delta *o = NULL;
+       BAT *ui = NULL, *uv = NULL;
+
+       ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type);
+       if (access == RD_UPD_VAL)
+               uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type);
+       while ((o = older_delta(d, tr)) != NULL) {
+               BAT *oui = NULL, *ouv = NULL;
+               if (!oui)
+                       oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type);
+               if (access == RD_UPD_VAL)
+                       ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type);
+               if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv)))
+                       return NULL;
+               if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
+                       return NULL;
+               d = o;
+       }
+       if (uv) {
+               bat_destroy(ui);
+               return uv;
+       }
+       return ui;
+}
+
+static BAT *
+bind_ucol(sql_trans *tr, sql_column *c, int access)
+{
+       return bind_ubat(tr, col_timestamp_delta(tr, c), access, 
c->type.type->localtype);
 }
 
 static BAT *
 bind_uidx(sql_trans *tr, sql_idx * i, int access)
 {
        int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
-       assert(tr->active);
-       sql_delta *d = idx_timestamp_delta(tr, i);
-       return cs_bind_ubat(&d->cs, access, type);
+       return bind_ubat(tr, idx_timestamp_delta(tr, i), access, type);
 }
 
 static BAT *
@@ -757,11 +893,29 @@ segments_is_append(segment *s, sql_trans
 }
 
 static int
+segments_is_deleted(segment *s, sql_trans *tr, oid rid)
+{
+       for(; s; s=s->next) {
+               if (s->start <= rid && s->end > rid) {
+                       if (s->ts >= tr->ts && s->deleted) {
+                               return 1;
+                       }
+                       break;
+               }
+       }
+       return 0;
+}
+
+/*
+ * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
+ */
+static int
 cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT 
*updates, int is_new)
 {
        storage *s = ATOMIC_PTR_GET(&t->data);
        int res = LOG_OK;
-       BAT *otids = tids;
+       BAT *otids = tids, *oupdates = updates;
+
        if (!BATcount(tids))
                return LOG_OK;
 
@@ -770,107 +924,225 @@ cs_update_bat( sql_trans *tr, column_sto
                if (!otids)
                        return LOG_ERR;
        }
+       /* When we go to smaller grained update structures we should check for 
concurrent updates on this column ! */
+       /* currently only one update delta is possible */
+       if (!otids->tsorted) {
+               BAT *sorted, *order;
+               if (BATsort(&sorted, &order, NULL, otids, NULL, NULL, false, 
false, false) != GDK_SUCCEED) {
+                       if (otids != tids)
+                               bat_destroy(otids);
+                       return LOG_ERR;
+               }
+               if (otids != tids)
+                       bat_destroy(otids);
+               otids = sorted;
+               oupdates = BATproject(order, oupdates);
+               bat_destroy(order);
+       }
+       assert(otids->tsorted);
        if (!is_new && !cs->cleared) {
-               BAT *ui, *uv;
-
-               if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
-                       return LOG_ERR;
+               BAT *ui = NULL, *uv = NULL;
 
                /* handle updates on just inserted bits */
-               if (count_inserts(s->segs->h, tr)) {
-                       segment *seg = s->segs->h;
-                       BUN ucnt = BATcount(otids);
-                       BATiter upi = bat_iterator(updates);
-                       BAT *b;
-
-                       if((b = temp_descriptor(cs->bid)) == NULL) {
-                               bat_destroy(ui);
-                               bat_destroy(uv);
-                               if (otids != tids)
-                                       bat_destroy(otids);
-                               return LOG_ERR;
-                       }
-
-                       if (BATtdense(otids)) {
-                               oid start = otids->tseqbase, offset = start;
-                               oid end = start + ucnt;
-                               for(; seg; seg=seg->next) {
-                                       if (seg->start <= start && seg->end > 
start) {
-                                               BUN lend = end < 
seg->end?end:seg->end;
-                                               if (seg->ts == tr->tid && 
!seg->deleted) {
-                                                       for (oid rid = start; 
rid < lend; rid++) {
-                                                               ptr upd = 
BUNtail(upi, rid-offset);
-                                                               if 
(void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
-                                                                       
bat_destroy(b);
-                                                                       
bat_destroy(ui);
-                                                                       
bat_destroy(uv);
-                                                                       if 
(otids != tids)
-                                                                               
bat_destroy(otids);
-                                                                       return 
LOG_ERR;
-                                                               }
+               /* handle updates on updates (within one transaction) */
+               BATiter upi = bat_iterator(oupdates);
+               BUN cnt = 0, ucnt = BATcount(otids);
+               BAT *b, *ins = NULL;
+               int *msk = NULL;
+
+               if((b = temp_descriptor(cs->bid)) == NULL)
+                       res = LOG_ERR;
+
+               if (BATtdense(otids)) {
+                       oid start = otids->tseqbase, offset = start;
+                       oid end = start + ucnt;
+
+                       for(segment *seg = s->segs->h; seg && res == LOG_OK ; 
seg=seg->next) {
+                               if (seg->start <= start && seg->end > start) {
+                                       /* check for delete conflicts */
+                                       if (seg->ts >= tr->ts && seg->deleted) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to