Changeset: 1845207c2511 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/1845207c2511 Modified Files: sql/storage/store.c Branch: iso Log Message:
merged with Jul2021 diffs (truncated from 892 to 300 lines): diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -71,8 +71,18 @@ unlock_table(sqlstore *store, sqlid id) MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]); } -#define lock_column(store, id) lock_table(store, id) -#define unlock_column(store, id) unlock_table(store, id) +static void +lock_column(sqlstore *store, sqlid id) +{ + MT_lock_set(&store->column_locks[id&(NR_TABLE_LOCKS-1)]); +} + +static void +unlock_column(sqlstore *store, sqlid id) +{ + MT_lock_unset(&store->column_locks[id&(NR_TABLE_LOCKS-1)]); +} + static int tc_gc_seg( sql_store Store, sql_change *change, ulng oldest) @@ -218,7 +228,7 @@ rollback_segments(segments *segs, sql_tr seg->next = cur->next; if (cur == segs->t) segs->t = seg; - mark4destroy(cur, change, oldest/* TODO somehow get current timestamp*/); + mark4destroy(cur, change, store_get_timestamp(tr->store)); cur = seg; } else { seg = cur; /* begin of new merge */ @@ -618,6 +628,7 @@ cs_bind_ubat( column_storage *cs, int ac BAT *b; assert(access == RD_UPD_ID || access == RD_UPD_VAL); + /* returns the updates for cs */ if (cs->uibid && cs->uvbid) { if (access == RD_UPD_ID) b = temp_descriptor(cs->uibid); @@ -630,20 +641,145 @@ cs_bind_ubat( column_storage *cs, int ac } static BAT * -bind_ucol(sql_trans *tr, sql_column *c, int access) +merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov) +{ + int err = 0; + BAT *uv = *UV; + BUN cnt = BATcount(ui)+BATcount(oi); + BAT *ni = bat_new(TYPE_oid, cnt, PERSISTENT); + BAT *nv = uv?bat_new(uv->ttype, cnt, PERSISTENT):NULL; + + if (!ni || (uv && !nv)) { + bat_destroy(ni); + bat_destroy(nv); + bat_destroy(ui); + bat_destroy(uv); + bat_destroy(oi); + bat_destroy(ov); + return NULL; + } + BATiter uvi; + BATiter ovi; + + if (uv) { + uvi = bat_iterator(uv); + ovi = bat_iterator(ov); + } + + /* handle dense (void) cases together as we need too merge updates (which is slower anyway) */ + BUN uip = 0, uie = BATcount(ui); + BUN oip = 0, oie = BATcount(oi); + + oid uiseqb = ui->tseqbase; + oid oiseqb = oi->tseqbase; + oid *uipt = NULL, *oipt = NULL; + if (!BATtdense(ui)) + uipt = Tloc(ui, 0); + if (!BATtdense(oi)) + oipt = Tloc(oi, 0); + while (uip < uie && oip < oie && !err) { + oid uiid = (uipt)?uipt[uip]: uiseqb+uip; + oid oiid = (oipt)?oipt[oip]: oiseqb+oip; + + if (uiid <= oiid) { + if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED || + (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED)) + err = 1; + uip++; + if (uiid == oiid) + oip++; + } else { /* uiid > oiid */ + if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED || + (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) ) + err = 1; + oip++; + } + } + while (uip < uie && !err) { + oid uiid = (uipt)?uipt[uip]: uiseqb+uip; + if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED || + (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED)) + err = 1; + uip++; + } + while (oip < oie && !err) { + oid oiid = (oipt)?oipt[oip]: oiseqb+oip; + if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED || + (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) ) + err = 1; + oip++; + } + bat_destroy(ui); + bat_destroy(uv); + bat_destroy(oi); + bat_destroy(ov); + if (!err) { + if (nv) + *UV = nv; + return ni; + } + *UV = NULL; + bat_destroy(ni); + bat_destroy(nv); + return NULL; +} + +static sql_delta * +older_delta( sql_delta *d, sql_trans *tr) +{ + sql_delta *o = d->next; + + while (o) { + if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr)) + break; + else + o = o->next; + } + if (o && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr)) + return o; + return NULL; +} + +static BAT * +bind_ubat(sql_trans *tr, sql_delta *d, int access, int type) { assert(tr->active); - sql_delta *d = col_timestamp_delta(tr, c); - return cs_bind_ubat(&d->cs, access, c->type.type->localtype); + sql_delta *o = NULL; + BAT *ui = NULL, *uv = NULL; + + ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type); + if (access == RD_UPD_VAL) + uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type); + while ((o = older_delta(d, tr)) != NULL) { + BAT *oui = NULL, *ouv = NULL; + if (!oui) + oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type); + if (access == RD_UPD_VAL) + ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type); + if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) + return NULL; + if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL) + return NULL; + d = o; + } + if (uv) { + bat_destroy(ui); + return uv; + } + return ui; +} + +static BAT * +bind_ucol(sql_trans *tr, sql_column *c, int access) +{ + return bind_ubat(tr, col_timestamp_delta(tr, c), access, c->type.type->localtype); } static BAT * bind_uidx(sql_trans *tr, sql_idx * i, int access) { int type = oid_index(i->type)?TYPE_oid:TYPE_lng; - assert(tr->active); - sql_delta *d = idx_timestamp_delta(tr, i); - return cs_bind_ubat(&d->cs, access, type); + return bind_ubat(tr, idx_timestamp_delta(tr, i), access, type); } static BAT * @@ -757,11 +893,29 @@ segments_is_append(segment *s, sql_trans } static int +segments_is_deleted(segment *s, sql_trans *tr, oid rid) +{ + for(; s; s=s->next) { + if (s->start <= rid && s->end > rid) { + if (s->ts >= tr->ts && s->deleted) { + return 1; + } + break; + } + } + return 0; +} + +/* + * Returns LOG_OK, LOG_ERR or LOG_CONFLICT + */ +static int cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT *updates, int is_new) { storage *s = ATOMIC_PTR_GET(&t->data); int res = LOG_OK; - BAT *otids = tids; + BAT *otids = tids, *oupdates = updates; + if (!BATcount(tids)) return LOG_OK; @@ -770,107 +924,225 @@ cs_update_bat( sql_trans *tr, column_sto if (!otids) return LOG_ERR; } + /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */ + /* currently only one update delta is possible */ + if (!otids->tsorted) { + BAT *sorted, *order; + if (BATsort(&sorted, &order, NULL, otids, NULL, NULL, false, false, false) != GDK_SUCCEED) { + if (otids != tids) + bat_destroy(otids); + return LOG_ERR; + } + if (otids != tids) + bat_destroy(otids); + otids = sorted; + oupdates = BATproject(order, oupdates); + bat_destroy(order); + } + assert(otids->tsorted); if (!is_new && !cs->cleared) { - BAT *ui, *uv; - - if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) - return LOG_ERR; + BAT *ui = NULL, *uv = NULL; /* handle updates on just inserted bits */ - if (count_inserts(s->segs->h, tr)) { - segment *seg = s->segs->h; - BUN ucnt = BATcount(otids); - BATiter upi = bat_iterator(updates); - BAT *b; - - if((b = temp_descriptor(cs->bid)) == NULL) { - bat_destroy(ui); - bat_destroy(uv); - if (otids != tids) - bat_destroy(otids); - return LOG_ERR; - } - - if (BATtdense(otids)) { - oid start = otids->tseqbase, offset = start; - oid end = start + ucnt; - for(; seg; seg=seg->next) { - if (seg->start <= start && seg->end > start) { - BUN lend = end < seg->end?end:seg->end; - if (seg->ts == tr->tid && !seg->deleted) { - for (oid rid = start; rid < lend; rid++) { - ptr upd = BUNtail(upi, rid-offset); - if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) { - bat_destroy(b); - bat_destroy(ui); - bat_destroy(uv); - if (otids != tids) - bat_destroy(otids); - return LOG_ERR; - } + /* handle updates on updates (within one transaction) */ + BATiter upi = bat_iterator(oupdates); + BUN cnt = 0, ucnt = BATcount(otids); + BAT *b, *ins = NULL; + int *msk = NULL; + + if((b = temp_descriptor(cs->bid)) == NULL) + res = LOG_ERR; + + if (BATtdense(otids)) { + oid start = otids->tseqbase, offset = start; + oid end = start + ucnt; + + for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=seg->next) { + if (seg->start <= start && seg->end > start) { + /* check for delete conflicts */ + if (seg->ts >= tr->ts && seg->deleted) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list