Changeset: 4cb531b29855 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/4cb531b29855
Modified Files:
        sql/storage/bat/bat_storage.c
Branch: scatter
Log Message:

merged with jul2021


diffs (273 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -77,7 +77,7 @@ unlock_table(sqlstore *store, sqlid id)
 static int
 tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
 {
-       (void)Store; 
+       (void)Store;
        segment *s = change->data;
 
        if (s->ts <= oldest) {
@@ -1010,7 +1010,7 @@ bind_col_data(sql_trans *tr, sql_column 
        if (isTempTable(c->t))
                obat = temp_col_timestamp_delta(tr, c);
 
-       if (obat->cs.ts == tr->tid || !update_conflict) /* on append there are 
no conflicts */
+       if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || 
tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there 
are no conflicts */
                return obat;
        if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(c->t)) {
                /* abort */
@@ -1075,7 +1075,7 @@ bind_idx_data(sql_trans *tr, sql_idx *i,
        if (isTempTable(i->t))
                obat = temp_idx_timestamp_delta(tr, i);
 
-       if (obat->cs.ts == tr->tid || !update_conflict)
+       if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || 
tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there 
are no conflicts */
                return obat;
        if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(i->t)) {
                /* abort */
@@ -1404,8 +1404,17 @@ destroy_storage(storage *bat)
        return ok;
 }
 
+static int
+segments_conflict(sql_trans *tr, segments *segs)
+{
+       for (segment *s = segs->h; s; s = s->next)
+               if (!VALID_4_READ(s->ts,tr))
+                       return 1;
+       return 0;
+}
+
 static storage *
-bind_del_data(sql_trans *tr, sql_table *t)
+bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
 {
        storage *obat = ATOMIC_PTR_GET(&t->data);
 
@@ -1414,18 +1423,26 @@ bind_del_data(sql_trans *tr, sql_table *
 
        if (obat->cs.ts == tr->tid)
                return obat;
-       if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t))
+       if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) {
                /* abort */
+               if (clear)
+                       *clear = true;
                return NULL;
-       if (!isTempTable(t))
+       }
+       if (!isTempTable(t) && !clear)
                return obat;
+       if (!isTempTable(t) && clear && segments_conflict(tr, obat->segs)) {
+               *clear = true;
+               return NULL;
+       }
+
        assert(!isTempTable(t));
        obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data));
        storage *bat = ZNEW(storage);
        if(!bat)
                return NULL;
        bat->cs.refcnt = 1;
-       dup_storage(tr, obat, bat, isTempTable(t));
+       dup_storage(tr, obat, bat, clear || isTempTable(t) /* for clear and 
temp create empty storage */);
        bat->cs.ts = tr->tid;
        /* only one writer else abort */
        bat->next = obat;
@@ -1447,7 +1464,7 @@ delete_tab(sql_trans *tr, sql_table * t,
        if (tpe == TYPE_bat && !BATcount(b))
                return ok;
 
-       if ((bat = bind_del_data(tr, t)) == NULL)
+       if ((bat = bind_del_data(tr, t, NULL)) == NULL)
                return LOG_ERR;
 
        lock_table(tr->store, t->base.id);
@@ -2298,10 +2315,11 @@ clear_cs(sql_trans *tr, column_storage *
 static BUN
 clear_col(sql_trans *tr, sql_column *c)
 {
+       bool update_conflict = false;
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
 
-       if ((delta = bind_col_data(tr, c, NULL)) == NULL)
-               return BUN_NONE;
+       if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
+               return update_conflict ? LOG_CONFLICT : LOG_ERR;
        if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) 
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
                trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, 
isLocalTemp(c->t)?NULL:&log_update_col);
        if (delta)
@@ -2312,12 +2330,13 @@ clear_col(sql_trans *tr, sql_column *c)
 static BUN
 clear_idx(sql_trans *tr, sql_idx *i)
 {
+       bool update_conflict = false;
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
 
        if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) 
<= 1) || !idx_has_column(i->type))
                return 0;
-       if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
-               return BUN_NONE;
+       if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
+               return update_conflict ? LOG_CONFLICT : LOG_ERR;
        if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) 
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
                trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, 
isLocalTemp(i->t)?NULL:&log_update_idx);
        if (delta)
@@ -2338,23 +2357,31 @@ clear_storage(sql_trans *tr, storage *s)
        return sz;
 }
 
-/* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT 
*/
+
+/*
+ * Clear the table, in general this means replacing the storage,
+ * but in case of earlier deletes (or inserts by this transaction), we only 
mark
+ * all segments as deleted.
+ * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
+ */
 static BUN
-clear_del(sql_trans *tr, sql_table *t)
+clear_del(sql_trans *tr, sql_table *t, int in_transaction)
 {
-       int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
+       int clear = !in_transaction || isTempTable(t), ok = LOG_OK;
+       bool conflict = false;
        storage *bat;
 
-       if ((bat = bind_del_data(tr, t)) == NULL)
-               return BUN_NONE;
-       if (!isTempTable(t)) {
+       if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
+               return conflict?BUN_NONE-1:BUN_NONE;
+
+       if (!clear) {
                lock_table(tr->store, t->base.id);
                ok = delete_range(tr, bat, 0, bat->segs->t->end);
                unlock_table(tr->store, t->base.id);
        }
        if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || 
(!isNew(t) && isLocalTemp(t)))
                trans_add(tr, &t->base, bat, &tc_gc_del, &commit_update_del, 
isLocalTemp(t)?NULL:&log_update_del);
-       if (ok == LOG_OK && isTempTable(t))
+       if (clear && ok == LOG_OK)
                return clear_storage(tr, bat);
        if (ok == LOG_ERR)
                return BUN_NONE;
@@ -2367,29 +2394,32 @@ clear_del(sql_trans *tr, sql_table *t)
 static BUN
 clear_table(sql_trans *tr, sql_table *t)
 {
+       int in_transaction = segments_in_transaction(tr, t);
+       int clear = !in_transaction || isTempTable(t);
+
        node *n = ol_first_node(t->columns);
        sql_column *c = n->data;
        BUN sz = count_col(tr, c, 0), clear_ok;
 
        storage *d = tab_timestamp_storage(tr, t);
        sz -= count_deletes_in_range(d->segs->h, tr, 0, sz);
-       if ((clear_ok = clear_del(tr, t)) >= BUN_NONE - 1)
+       if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
                return clear_ok;
 
-       if (isTempTable(t)) { /* temp tables switch too new bats */
+       if (clear) {
                for (; n; n = n->next) {
                        c = n->data;
 
-                       if (clear_col(tr, c) == BUN_NONE)
-                               return BUN_NONE;
+                       if ((clear_ok = clear_col(tr, c)) >= BUN_NONE - 1)
+                               return clear_ok;
                }
                if (t->idxs) {
                        for (n = ol_first_node(t->idxs); n; n = n->next) {
                                sql_idx *ci = n->data;
 
                                if (isTable(ci->t) && idx_has_column(ci->type) 
&&
-                                       clear_idx(tr, ci) == BUN_NONE)
-                                       return BUN_NONE;
+                                       (clear_ok = clear_idx(tr, ci)) >= 
BUN_NONE - 1)
+                                       return clear_ok;
                        }
                }
        }
@@ -2503,6 +2533,8 @@ static int
 log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
 {
        int ok = segments2cs(tr, s->segs, &s->cs);
+       if (ok == LOG_OK && s->cs.cleared)
+               return tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
        if (ok == LOG_OK)
                ok = log_table_append(tr, t, s->segs);
        if (ok == LOG_OK)
@@ -2571,7 +2603,6 @@ tr_merge_storage(sql_trans *tr, storage 
        int ok = tr_merge_cs(tr, &tdb->cs);
 
        if (tdb->next) {
-               assert(0);
                ok = destroy_storage(tdb->next);
                tdb->next = NULL;
        }
@@ -2669,6 +2700,22 @@ tc_gc_rollbacked( sql_store Store, sql_c
 }
 
 static int
+tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
+{
+       sqlstore *store = Store;
+
+       storage *d = (storage*)change->data;
+       if (d->cs.ts < oldest) {
+               destroy_storage(d);
+               return 1;
+       }
+       if (d->cs.ts > TRANSACTION_ID_BASE)
+               d->cs.ts = store_get_timestamp(store) + 1;
+       return 0;
+}
+
+
+static int
 commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng 
oldest)
 {
        int ok = LOG_OK;
@@ -2858,9 +2905,26 @@ commit_update_del( sql_trans *tr, sql_ch
        }
        lock_table(tr->store, t->base.id);
        if (!commit_ts) { /* rollback */
-               rollback_segments(dbat->segs, tr, change, oldest);
+               if (dbat->cs.ts == tr->tid) {
+                       storage *d = change->data, *o = 
ATOMIC_PTR_GET(&t->data);
+
+                       if (o != d) {
+                               while(o && o->next != d)
+                                       o = o->next;
+                       }
+                       if (o == ATOMIC_PTR_GET(&t->data)) {
+                               assert(d->next);
+                               ATOMIC_PTR_SET(&t->data, d->next);
+                       } else
+                               o->next = d->next;
+                       d->next = NULL;
+                       change->cleanup = &tc_gc_rollbacked_storage;
+               } else
+                       rollback_segments(dbat->segs, tr, change, oldest);
        } else if (ok == LOG_OK && !tr->parent) {
                storage *d = dbat;
+               if (dbat->cs.ts == tr->tid) /* cleared table */
+                       dbat->cs.ts = commit_ts;
                merge_segments(dbat->segs, tr, change, commit_ts, oldest);
                if (ok == LOG_OK && dbat == d && oldest == commit_ts)
                        ok = tr_merge_storage(tr, dbat);
@@ -3106,7 +3170,7 @@ claim_tab(sql_trans *tr, sql_table *t, s
 
        /* we have a single segment structure for each persistent table
         * for temporary tables each has its own */
-       if ((s = bind_del_data(tr, t)) == NULL)
+       if ((s = bind_del_data(tr, t, NULL)) == NULL)
                return NULL;
 
        lock_table(tr->store, t->base.id);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to