Changeset: 0010fd8975fd for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0010fd8975fd Modified Files: sql/storage/bat/bat_storage.c sql/storage/sql_storage.h sql/storage/store.c Branch: unlock Log Message:
add some more locks to make sure logrotation and appending are exclusive diffs (249 lines): diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -806,10 +806,12 @@ append_col(sql_trans *tr, sql_column *c, bat = c->data; /* appends only write */ bat->cs.wtime = c->base.atime = c->t->base.atime = c->t->s->base.atime = tr->atime = tr->wstime; + lock_table(c->t->base.id); if (tpe == TYPE_bat) ok = delta_append_bat(bat, offset, i, c->t); else ok = delta_append_val(bat, offset, i, c->t); + unlock_table(c->t->base.id); return ok; } @@ -829,10 +831,12 @@ append_idx(sql_trans *tr, sql_idx * i, s bat = i->data; /* appends only write */ bat->cs.wtime = i->base.atime = i->t->base.atime = i->t->s->base.atime = tr->atime = tr->wstime; + lock_table(i->t->base.id); if (tpe == TYPE_bat) ok = delta_append_bat(bat, offset, ib, i->t); else ok = delta_append_val(bat, offset, ib, i->t); + unlock_table(i->t->base.id); return ok; } @@ -1826,7 +1830,7 @@ minmax( sql_trans *tr ) } static int -tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs) +tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs, BUN end) { int ok = LOG_OK; BAT *cur = NULL; @@ -1854,6 +1858,7 @@ tr_update_cs( sql_trans *tr, column_stor cur = temp_descriptor(ocs->bid); if(!cur) return LOG_ERR; + assert(end <= BATcount(cur)); if (ccs->ucnt && ccs->uibid) { assert(!cleared); BAT *ui = temp_descriptor(ccs->uibid); @@ -1902,9 +1907,9 @@ tr_update_cs( sql_trans *tr, column_stor } static int -tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat) +tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat, BUN end) { - int ok = tr_update_cs( tr, &obat->cs, &cbat->cs); + int ok = tr_update_cs( tr, &obat->cs, &cbat->cs, end); if (ok == LOG_OK && obat->next) { ok = destroy_bat(obat->next); @@ -1914,7 +1919,7 @@ tr_update_delta( sql_trans *tr, sql_delt } static int -tr_merge_cs( sql_trans *tr, column_storage *cs) +tr_merge_cs( sql_trans *tr, column_storage *cs, BUN end) { int ok = LOG_OK; BAT *cur = NULL; @@ -1929,6 +1934,7 @@ tr_merge_cs( sql_trans *tr, column_stora return LOG_ERR; } + assert(end <= BATcount(cur)); if (cs->ucnt) { BAT *ui = temp_descriptor(cs->uibid); BAT *uv = temp_descriptor(cs->uvbid); @@ -1966,9 +1972,9 @@ tr_merge_cs( sql_trans *tr, column_stora } static int -tr_merge_delta( sql_trans *tr, sql_delta *obat) +tr_merge_delta( sql_trans *tr, sql_delta *obat, BUN end) { - int ok = tr_merge_cs(tr, &obat->cs); + int ok = tr_merge_cs(tr, &obat->cs, end); if (obat->next) { ok = destroy_bat(obat->next); obat->next = NULL; @@ -2014,7 +2020,7 @@ tr_update_dbat( sql_trans *tr, storage * } ts->cnt += fs->ucnt; ts->cnt -= fs->icnt; - int ok = tr_update_cs( tr, &ts->cs, &fs->cs); + int ok = tr_update_cs( tr, &ts->cs, &fs->cs, ts->end); if (ok == LOG_OK && ts->next) { ok = destroy_dbat(tr, ts->next); ts->next = NULL; @@ -2025,7 +2031,7 @@ tr_update_dbat( sql_trans *tr, storage * static int tr_merge_dbat(sql_trans *tr, storage *tdb) { - int ok = tr_merge_cs(tr, &tdb->cs); + int ok = tr_merge_cs(tr, &tdb->cs, tdb->end); if (tdb->next) { ok = destroy_dbat(tr, tdb->next); tdb->next = NULL; @@ -2040,6 +2046,7 @@ update_table(sql_trans *tr, sql_table *f sql_table *ot = NULL; int ok = LOG_OK; node *n, *m, *o = NULL; + segments *sg = NULL; if (ATOMIC_GET(&store_nr_active) == 1 || ft->base.allocated) { if (ATOMIC_GET(&store_nr_active) > 1 && ft->data) { /* move delta */ @@ -2064,6 +2071,7 @@ update_table(sql_trans *tr, sql_table *f } } } else if (tt->data && ft->base.allocated) { + sg = ((storage*)ft->data)->segs; if (tr_update_dbat(tr, tt->data, ft->data) != LOG_OK) ok = LOG_ERR; } else if (ATOMIC_GET(&store_nr_active) == 1 && !ft->base.allocated) { @@ -2073,6 +2081,7 @@ update_table(sql_trans *tr, sql_table *f tt->data = timestamp_dbat(ot->data, tt->base.stime); } assert(tt->data); + sg = ((storage*)tt->data)->segs; if (tr_merge_dbat(tr, tt->data) != LOG_OK) ok = LOG_ERR; ft->data = NULL; @@ -2115,7 +2124,7 @@ update_table(sql_trans *tr, sql_table *f } } } else if (oc->data && cc->base.allocated) { - if (tr_update_delta(tr, oc->data, cc->data) != LOG_OK) + if (tr_update_delta(tr, oc->data, cc->data, sg->end) != LOG_OK) ok = LOG_ERR; } else if (ATOMIC_GET(&store_nr_active) == 1 && !cc->base.allocated) { /* only deletes, merge earlier changes */ @@ -2125,7 +2134,7 @@ update_table(sql_trans *tr, sql_table *f } assert(oc->data); if (cc->base.wtime) { - if (tr_merge_delta(tr, oc->data) != LOG_OK) + if (tr_merge_delta(tr, oc->data, sg->end) != LOG_OK) ok = LOG_ERR; cc->data = NULL; } @@ -2202,7 +2211,7 @@ update_table(sql_trans *tr, sql_table *f } } } else if (oi->data && ci->base.allocated) { - if (tr_update_delta(tr, oi->data, ci->data) != LOG_OK) + if (tr_update_delta(tr, oi->data, ci->data, sg->end) != LOG_OK) ok = LOG_ERR; } else if (ATOMIC_GET(&store_nr_active) == 1 && !ci->base.allocated) { if (!oi->data) { @@ -2211,7 +2220,7 @@ update_table(sql_trans *tr, sql_table *f } assert(oi->data); if (ci->base.wtime) { - if (tr_merge_delta(tr, oi->data) != LOG_OK) + if (tr_merge_delta(tr, oi->data, sg->end) != LOG_OK) ok = LOG_ERR; ci->data = NULL; } @@ -2246,6 +2255,15 @@ update_table(sql_trans *tr, sql_table *f } static int +rollback_table(sql_trans *tr, sql_table *t) +{ + (void)tr; + (void)t; + fprintf(stderr, "rollback %s.%s\n", t->s->base.name, t->base.name); + return 0; +} + +static int tr_log_cs( sql_trans *tr, column_storage *cs, segment *segs, int cleared, sqlid id) { int ok = GDK_SUCCEED; @@ -2452,6 +2470,7 @@ bat_storage_init( store_functions *sf) sf->update_table = (update_table_fptr)&update_table; sf->log_table = (update_table_fptr)&log_table; sf->gtrans_minmax = (gtrans_update_fptr)&minmax; + sf->rollback_table = (clear_table_fptr)&rollback_table; sf->cleanup = (cleanup_fptr)&cleanup; diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h --- a/sql/storage/sql_storage.h +++ b/sql/storage/sql_storage.h @@ -262,6 +262,7 @@ typedef struct store_functions { update_table_fptr log_table; update_table_fptr update_table; gtrans_update_fptr gtrans_minmax; + clear_table_fptr rollback_table; cleanup_fptr cleanup; } store_functions; diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -7420,6 +7420,36 @@ sql_session_destroy(sql_session *s) } static void +table_rollback(sql_trans *tr, sql_table *t) +{ + store_funcs.rollback_table(tr, t); +} + +static void +schema_rollback(sql_trans *tr, sql_schema *s) +{ + if (s->tables.set) { + for (node *n = s->tables.set->h; n; n = n->next) { + sql_table *t = n->data; + if (t->base.atime) + table_rollback(tr, t); + } + } +} + +static void +sql_trans_rollback(sql_trans *tr) +{ + sql_schema *tmp = find_sql_schema(tr, "tmp"); + + for (node *m = tr->schemas.set->h; m; m = m->next) { + sql_schema *s = m->data; + if (s->base.atime && s != tmp) + schema_rollback(tr, s); + } +} + +static void sql_trans_reset_tmp(sql_trans *tr, int commit) { sql_schema *tmp = find_sql_schema(tr, "tmp"); @@ -7520,6 +7550,8 @@ sql_trans_end(sql_session *s, int commit TRC_DEBUG(SQL_STORE, "End of transaction: %d\n", s->tr->schema_number); s->tr->active = 0; s->auto_commit = s->ac_on_commit; + if(!commit && s->tr->atime) + sql_trans_rollback(s->tr); sql_trans_reset_tmp(s->tr, commit); /* reset temp schema */ if (s->tr->parent == gtrans) { list_move_data(active_sessions, passive_sessions, s); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list