Changeset: 4cb531b29855 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/4cb531b29855 Modified Files: sql/storage/bat/bat_storage.c Branch: scatter Log Message:
merged with jul2021 diffs (273 lines): diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -77,7 +77,7 @@ unlock_table(sqlstore *store, sqlid id) static int tc_gc_seg( sql_store Store, sql_change *change, ulng oldest) { - (void)Store; + (void)Store; segment *s = change->data; if (s->ts <= oldest) { @@ -1010,7 +1010,7 @@ bind_col_data(sql_trans *tr, sql_column if (isTempTable(c->t)) obat = temp_col_timestamp_delta(tr, c); - if (obat->cs.ts == tr->tid || !update_conflict) /* on append there are no conflicts */ + if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */ return obat; if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(c->t)) { /* abort */ @@ -1075,7 +1075,7 @@ bind_idx_data(sql_trans *tr, sql_idx *i, if (isTempTable(i->t)) obat = temp_idx_timestamp_delta(tr, i); - if (obat->cs.ts == tr->tid || !update_conflict) + if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */ return obat; if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(i->t)) { /* abort */ @@ -1404,8 +1404,17 @@ destroy_storage(storage *bat) return ok; } +static int +segments_conflict(sql_trans *tr, segments *segs) +{ + for (segment *s = segs->h; s; s = s->next) + if (!VALID_4_READ(s->ts,tr)) + return 1; + return 0; +} + static storage * -bind_del_data(sql_trans *tr, sql_table *t) +bind_del_data(sql_trans *tr, sql_table *t, bool *clear) { storage *obat = ATOMIC_PTR_GET(&t->data); @@ -1414,18 +1423,26 @@ bind_del_data(sql_trans *tr, sql_table * if (obat->cs.ts == tr->tid) return obat; - if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) + if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) { /* abort */ + if (clear) + *clear = true; return NULL; - if (!isTempTable(t)) + } + if (!isTempTable(t) && !clear) return obat; + if (!isTempTable(t) && clear && segments_conflict(tr, obat->segs)) { + *clear = true; + return NULL; + } + assert(!isTempTable(t)); obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data)); storage *bat = ZNEW(storage); if(!bat) return NULL; bat->cs.refcnt = 1; - dup_storage(tr, obat, bat, isTempTable(t)); + dup_storage(tr, obat, bat, clear || isTempTable(t) /* for clear and temp create empty storage */); bat->cs.ts = tr->tid; /* only one writer else abort */ bat->next = obat; @@ -1447,7 +1464,7 @@ delete_tab(sql_trans *tr, sql_table * t, if (tpe == TYPE_bat && !BATcount(b)) return ok; - if ((bat = bind_del_data(tr, t)) == NULL) + if ((bat = bind_del_data(tr, t, NULL)) == NULL) return LOG_ERR; lock_table(tr->store, t->base.id); @@ -2298,10 +2315,11 @@ clear_cs(sql_trans *tr, column_storage * static BUN clear_col(sql_trans *tr, sql_column *c) { + bool update_conflict = false; sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data); - if ((delta = bind_col_data(tr, c, NULL)) == NULL) - return BUN_NONE; + if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL) + return update_conflict ? LOG_CONFLICT : LOG_ERR; if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) && isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t))) trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, isLocalTemp(c->t)?NULL:&log_update_col); if (delta) @@ -2312,12 +2330,13 @@ clear_col(sql_trans *tr, sql_column *c) static BUN clear_idx(sql_trans *tr, sql_idx *i) { + bool update_conflict = false; sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data); if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type)) return 0; - if ((delta = bind_idx_data(tr, i, NULL)) == NULL) - return BUN_NONE; + if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL) + return update_conflict ? LOG_CONFLICT : LOG_ERR; if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) && isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t))) trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, isLocalTemp(i->t)?NULL:&log_update_idx); if (delta) @@ -2338,23 +2357,31 @@ clear_storage(sql_trans *tr, storage *s) return sz; } -/* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */ + +/* + * Clear the table, in general this means replacing the storage, + * but in case of earlier deletes (or inserts by this transaction), we only mark + * all segments as deleted. + * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT + */ static BUN -clear_del(sql_trans *tr, sql_table *t) +clear_del(sql_trans *tr, sql_table *t, int in_transaction) { - int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK; + int clear = !in_transaction || isTempTable(t), ok = LOG_OK; + bool conflict = false; storage *bat; - if ((bat = bind_del_data(tr, t)) == NULL) - return BUN_NONE; - if (!isTempTable(t)) { + if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL) + return conflict?BUN_NONE-1:BUN_NONE; + + if (!clear) { lock_table(tr->store, t->base.id); ok = delete_range(tr, bat, 0, bat->segs->t->end); unlock_table(tr->store, t->base.id); } if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t))) trans_add(tr, &t->base, bat, &tc_gc_del, &commit_update_del, isLocalTemp(t)?NULL:&log_update_del); - if (ok == LOG_OK && isTempTable(t)) + if (clear && ok == LOG_OK) return clear_storage(tr, bat); if (ok == LOG_ERR) return BUN_NONE; @@ -2367,29 +2394,32 @@ clear_del(sql_trans *tr, sql_table *t) static BUN clear_table(sql_trans *tr, sql_table *t) { + int in_transaction = segments_in_transaction(tr, t); + int clear = !in_transaction || isTempTable(t); + node *n = ol_first_node(t->columns); sql_column *c = n->data; BUN sz = count_col(tr, c, 0), clear_ok; storage *d = tab_timestamp_storage(tr, t); sz -= count_deletes_in_range(d->segs->h, tr, 0, sz); - if ((clear_ok = clear_del(tr, t)) >= BUN_NONE - 1) + if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1) return clear_ok; - if (isTempTable(t)) { /* temp tables switch too new bats */ + if (clear) { for (; n; n = n->next) { c = n->data; - if (clear_col(tr, c) == BUN_NONE) - return BUN_NONE; + if ((clear_ok = clear_col(tr, c)) >= BUN_NONE - 1) + return clear_ok; } if (t->idxs) { for (n = ol_first_node(t->idxs); n; n = n->next) { sql_idx *ci = n->data; if (isTable(ci->t) && idx_has_column(ci->type) && - clear_idx(tr, ci) == BUN_NONE) - return BUN_NONE; + (clear_ok = clear_idx(tr, ci)) >= BUN_NONE - 1) + return clear_ok; } } } @@ -2503,6 +2533,8 @@ static int log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id) { int ok = segments2cs(tr, s->segs, &s->cs); + if (ok == LOG_OK && s->cs.cleared) + return tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id); if (ok == LOG_OK) ok = log_table_append(tr, t, s->segs); if (ok == LOG_OK) @@ -2571,7 +2603,6 @@ tr_merge_storage(sql_trans *tr, storage int ok = tr_merge_cs(tr, &tdb->cs); if (tdb->next) { - assert(0); ok = destroy_storage(tdb->next); tdb->next = NULL; } @@ -2669,6 +2700,22 @@ tc_gc_rollbacked( sql_store Store, sql_c } static int +tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest) +{ + sqlstore *store = Store; + + storage *d = (storage*)change->data; + if (d->cs.ts < oldest) { + destroy_storage(d); + return 1; + } + if (d->cs.ts > TRANSACTION_ID_BASE) + d->cs.ts = store_get_timestamp(store) + 1; + return 0; +} + + +static int commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) { int ok = LOG_OK; @@ -2858,9 +2905,26 @@ commit_update_del( sql_trans *tr, sql_ch } lock_table(tr->store, t->base.id); if (!commit_ts) { /* rollback */ - rollback_segments(dbat->segs, tr, change, oldest); + if (dbat->cs.ts == tr->tid) { + storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data); + + if (o != d) { + while(o && o->next != d) + o = o->next; + } + if (o == ATOMIC_PTR_GET(&t->data)) { + assert(d->next); + ATOMIC_PTR_SET(&t->data, d->next); + } else + o->next = d->next; + d->next = NULL; + change->cleanup = &tc_gc_rollbacked_storage; + } else + rollback_segments(dbat->segs, tr, change, oldest); } else if (ok == LOG_OK && !tr->parent) { storage *d = dbat; + if (dbat->cs.ts == tr->tid) /* cleared table */ + dbat->cs.ts = commit_ts; merge_segments(dbat->segs, tr, change, commit_ts, oldest); if (ok == LOG_OK && dbat == d && oldest == commit_ts) ok = tr_merge_storage(tr, dbat); @@ -3106,7 +3170,7 @@ claim_tab(sql_trans *tr, sql_table *t, s /* we have a single segment structure for each persistent table * for temporary tables each has its own */ - if ((s = bind_del_data(tr, t)) == NULL) + if ((s = bind_del_data(tr, t, NULL)) == NULL) return NULL; lock_table(tr->store, t->base.id); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list