Changeset: 0010fd8975fd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0010fd8975fd
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: unlock
Log Message:

add some more locks to make sure logrotation and appending are exclusive


diffs (249 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -806,10 +806,12 @@ append_col(sql_trans *tr, sql_column *c,
        bat = c->data;
        /* appends only write */
        bat->cs.wtime = c->base.atime = c->t->base.atime = c->t->s->base.atime 
= tr->atime = tr->wstime;
+       lock_table(c->t->base.id);
        if (tpe == TYPE_bat)
                ok = delta_append_bat(bat, offset, i, c->t);
        else
                ok = delta_append_val(bat, offset, i, c->t);
+       unlock_table(c->t->base.id);
        return ok;
 }
 
@@ -829,10 +831,12 @@ append_idx(sql_trans *tr, sql_idx * i, s
        bat = i->data;
        /* appends only write */
        bat->cs.wtime = i->base.atime = i->t->base.atime = i->t->s->base.atime 
= tr->atime = tr->wstime;
+       lock_table(i->t->base.id);
        if (tpe == TYPE_bat)
                ok = delta_append_bat(bat, offset, ib, i->t);
        else
                ok = delta_append_val(bat, offset, ib, i->t);
+       unlock_table(i->t->base.id);
        return ok;
 }
 
@@ -1826,7 +1830,7 @@ minmax( sql_trans *tr )
 }
 
 static int
-tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs)
+tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs, BUN end)
 {
        int ok = LOG_OK;
        BAT *cur = NULL;
@@ -1854,6 +1858,7 @@ tr_update_cs( sql_trans *tr, column_stor
        cur = temp_descriptor(ocs->bid);
        if(!cur)
                return LOG_ERR;
+       assert(end <= BATcount(cur));
        if (ccs->ucnt && ccs->uibid) {
                assert(!cleared);
                BAT *ui = temp_descriptor(ccs->uibid);
@@ -1902,9 +1907,9 @@ tr_update_cs( sql_trans *tr, column_stor
 }
 
 static int
-tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat)
+tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat, BUN end)
 {
-       int ok = tr_update_cs( tr, &obat->cs, &cbat->cs);
+       int ok = tr_update_cs( tr, &obat->cs, &cbat->cs, end);
 
        if (ok == LOG_OK && obat->next) {
                ok = destroy_bat(obat->next);
@@ -1914,7 +1919,7 @@ tr_update_delta( sql_trans *tr, sql_delt
 }
 
 static int
-tr_merge_cs( sql_trans *tr, column_storage *cs)
+tr_merge_cs( sql_trans *tr, column_storage *cs, BUN end)
 {
        int ok = LOG_OK;
        BAT *cur = NULL;
@@ -1929,6 +1934,7 @@ tr_merge_cs( sql_trans *tr, column_stora
                        return LOG_ERR;
        }
 
+       assert(end <= BATcount(cur));
        if (cs->ucnt) {
                BAT *ui = temp_descriptor(cs->uibid);
                BAT *uv = temp_descriptor(cs->uvbid);
@@ -1966,9 +1972,9 @@ tr_merge_cs( sql_trans *tr, column_stora
 }
 
 static int
-tr_merge_delta( sql_trans *tr, sql_delta *obat)
+tr_merge_delta( sql_trans *tr, sql_delta *obat, BUN end)
 {
-       int ok = tr_merge_cs(tr, &obat->cs);
+       int ok = tr_merge_cs(tr, &obat->cs, end);
        if (obat->next) {
                ok = destroy_bat(obat->next);
                obat->next = NULL;
@@ -2014,7 +2020,7 @@ tr_update_dbat( sql_trans *tr, storage *
        }
        ts->cnt += fs->ucnt;
        ts->cnt -= fs->icnt;
-       int ok = tr_update_cs( tr, &ts->cs, &fs->cs);
+       int ok = tr_update_cs( tr, &ts->cs, &fs->cs, ts->end);
        if (ok == LOG_OK && ts->next) {
                ok = destroy_dbat(tr, ts->next);
                ts->next = NULL;
@@ -2025,7 +2031,7 @@ tr_update_dbat( sql_trans *tr, storage *
 static int
 tr_merge_dbat(sql_trans *tr, storage *tdb)
 {
-       int ok = tr_merge_cs(tr, &tdb->cs);
+       int ok = tr_merge_cs(tr, &tdb->cs, tdb->end);
        if (tdb->next) {
                ok = destroy_dbat(tr, tdb->next);
                tdb->next = NULL;
@@ -2040,6 +2046,7 @@ update_table(sql_trans *tr, sql_table *f
        sql_table *ot = NULL;
        int ok = LOG_OK;
        node *n, *m, *o = NULL;
+       segments *sg = NULL;
 
        if (ATOMIC_GET(&store_nr_active) == 1 || ft->base.allocated) {
                if (ATOMIC_GET(&store_nr_active) > 1 && ft->data) { /* move 
delta */
@@ -2064,6 +2071,7 @@ update_table(sql_trans *tr, sql_table *f
                                }
                        }
                } else if (tt->data && ft->base.allocated) {
+                       sg = ((storage*)ft->data)->segs;
                        if (tr_update_dbat(tr, tt->data, ft->data) != LOG_OK)
                                ok = LOG_ERR;
                } else if (ATOMIC_GET(&store_nr_active) == 1 && 
!ft->base.allocated) {
@@ -2073,6 +2081,7 @@ update_table(sql_trans *tr, sql_table *f
                                tt->data = timestamp_dbat(ot->data, 
tt->base.stime);
                        }
                        assert(tt->data);
+                       sg = ((storage*)tt->data)->segs;
                        if (tr_merge_dbat(tr, tt->data) != LOG_OK)
                                ok = LOG_ERR;
                        ft->data = NULL;
@@ -2115,7 +2124,7 @@ update_table(sql_trans *tr, sql_table *f
                                        }
                                }
                        } else if (oc->data && cc->base.allocated) {
-                               if (tr_update_delta(tr, oc->data, cc->data) != 
LOG_OK)
+                               if (tr_update_delta(tr, oc->data, cc->data, 
sg->end) != LOG_OK)
                                        ok = LOG_ERR;
                        } else if (ATOMIC_GET(&store_nr_active) == 1 && 
!cc->base.allocated) {
                                /* only deletes, merge earlier changes */
@@ -2125,7 +2134,7 @@ update_table(sql_trans *tr, sql_table *f
                                }
                                assert(oc->data);
                                if (cc->base.wtime) {
-                                       if (tr_merge_delta(tr, oc->data) != 
LOG_OK)
+                                       if (tr_merge_delta(tr, oc->data, 
sg->end) != LOG_OK)
                                                ok = LOG_ERR;
                                        cc->data = NULL;
                                }
@@ -2202,7 +2211,7 @@ update_table(sql_trans *tr, sql_table *f
                                                }
                                        }
                                } else if (oi->data && ci->base.allocated) {
-                                       if (tr_update_delta(tr, oi->data, 
ci->data) != LOG_OK)
+                                       if (tr_update_delta(tr, oi->data, 
ci->data, sg->end) != LOG_OK)
                                                ok = LOG_ERR;
                                } else if (ATOMIC_GET(&store_nr_active) == 1 && 
!ci->base.allocated) {
                                        if (!oi->data) {
@@ -2211,7 +2220,7 @@ update_table(sql_trans *tr, sql_table *f
                                        }
                                        assert(oi->data);
                                        if (ci->base.wtime) {
-                                               if (tr_merge_delta(tr, 
oi->data) != LOG_OK)
+                                               if (tr_merge_delta(tr, 
oi->data, sg->end) != LOG_OK)
                                                        ok = LOG_ERR;
                                                ci->data = NULL;
                                        }
@@ -2246,6 +2255,15 @@ update_table(sql_trans *tr, sql_table *f
 }
 
 static int
+rollback_table(sql_trans *tr, sql_table *t)
+{
+       (void)tr;
+       (void)t;
+       fprintf(stderr, "rollback %s.%s\n", t->s->base.name, t->base.name);
+       return 0;
+}
+
+static int
 tr_log_cs( sql_trans *tr, column_storage *cs, segment *segs, int cleared, 
sqlid id)
 {
        int ok = GDK_SUCCEED;
@@ -2452,6 +2470,7 @@ bat_storage_init( store_functions *sf)
        sf->update_table = (update_table_fptr)&update_table;
        sf->log_table = (update_table_fptr)&log_table;
        sf->gtrans_minmax = (gtrans_update_fptr)&minmax;
+       sf->rollback_table = (clear_table_fptr)&rollback_table;
 
        sf->cleanup = (cleanup_fptr)&cleanup;
 
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -262,6 +262,7 @@ typedef struct store_functions {
        update_table_fptr log_table;
        update_table_fptr update_table;
        gtrans_update_fptr gtrans_minmax;
+       clear_table_fptr rollback_table;
 
        cleanup_fptr cleanup;
 } store_functions;
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -7420,6 +7420,36 @@ sql_session_destroy(sql_session *s)
 }
 
 static void
+table_rollback(sql_trans *tr, sql_table *t)
+{
+       store_funcs.rollback_table(tr, t);
+}
+
+static void
+schema_rollback(sql_trans *tr, sql_schema *s)
+{
+       if (s->tables.set) {
+               for (node *n = s->tables.set->h; n; n = n->next) {
+                       sql_table *t = n->data;
+                       if (t->base.atime)
+                               table_rollback(tr, t);
+               }
+       }
+}
+
+static void
+sql_trans_rollback(sql_trans *tr) 
+{
+       sql_schema *tmp = find_sql_schema(tr, "tmp");
+
+       for (node *m = tr->schemas.set->h; m; m = m->next) {
+               sql_schema *s = m->data;
+               if (s->base.atime && s != tmp)
+                       schema_rollback(tr, s);
+       }
+}
+
+static void
 sql_trans_reset_tmp(sql_trans *tr, int commit)
 {
        sql_schema *tmp = find_sql_schema(tr, "tmp");
@@ -7520,6 +7550,8 @@ sql_trans_end(sql_session *s, int commit
        TRC_DEBUG(SQL_STORE, "End of transaction: %d\n", s->tr->schema_number);
        s->tr->active = 0;
        s->auto_commit = s->ac_on_commit;
+       if(!commit && s->tr->atime)
+               sql_trans_rollback(s->tr);
        sql_trans_reset_tmp(s->tr, commit); /* reset temp schema */
        if (s->tr->parent == gtrans) {
                list_move_data(active_sessions, passive_sessions, s);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to