Changeset: 2306105f0143 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2306105f0143
Modified Files:
        sql/include/sql_catalog.h
        sql/storage/objectset.c
        sql/storage/store.c
Branch: iso
Log Message:

Avoid race condition by testing for concurrent inserts inside the changeset 
under the store lock


diffs (206 lines):

diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -244,7 +244,7 @@ typedef int (*tc_commit_fptr) (struct sq
 typedef int (*tc_cleanup_fptr) (sql_store store, struct sql_change *c, ulng 
oldest);   /* garbage collection, ie cleanup structures when possible */
 typedef void (*destroy_fptr)(sql_store store, sql_base *b);
 
-extern struct objectset *os_new(sql_allocator *sa, destroy_fptr destroy, bool 
temporary, bool unique, sql_store store);
+extern struct objectset *os_new(sql_allocator *sa, destroy_fptr destroy, bool 
temporary, bool unique, bool concurrent, sql_store store);
 extern struct objectset *os_dup(struct objectset *os);
 extern void os_destroy(struct objectset *os, sql_store store);
 extern int /*ok, error (name existed) and conflict (added before) */ 
os_add(struct objectset *os, struct sql_trans *tr, const char *name, sql_base 
*b);
diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -59,8 +59,10 @@ typedef struct objectset {
        int id_based_cnt;
        struct sql_hash *name_map;
        struct sql_hash *id_map;
-       bool temporary;
-       bool unique;    /* names are unique */
+       u_int8_t
+               temporary:1,
+               unique:1, /* names are unique */
+               concurrent:1;   /* concurrent inserts are allowed */
        sql_store store;
 } objectset;
 
@@ -613,7 +615,7 @@ tc_commit_objectversion(sql_trans *tr, s
 }
 
 objectset *
-os_new(sql_allocator *sa, destroy_fptr destroy, bool temporary, bool unique, 
sql_store store)
+os_new(sql_allocator *sa, destroy_fptr destroy, bool temporary, bool unique, 
bool concurrent, sql_store store)
 {
        objectset *os = SA_NEW(sa, objectset);
        *os = (objectset) {
@@ -622,6 +624,7 @@ os_new(sql_allocator *sa, destroy_fptr d
                .destroy = destroy,
                .temporary = temporary,
                .unique = unique,
+               .concurrent = concurrent,
                .store = store
        };
        os->destroy = destroy;
@@ -849,6 +852,13 @@ os_add_(objectset *os, struct sql_trans 
        ov->b = b;
        ov->os = os;
 
+       if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets 
without concurrent support, conflict if concurrent changes are there */
+               if (os->destroy)
+                       os->destroy(os->store, ov->b);
+               _DELETE(ov);
+               return -3; /* conflict */
+       }
+
        if ((res = os_add_id_based(os, tr, b->id, ov))) {
                if (os->destroy)
                        os->destroy(os->store, ov->b);
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -961,14 +961,14 @@ load_schema(sql_trans *tr, res_table *rt
                s->system = 
*(bte*)store->table_api.table_fetch_value(rt_schemas, find_sql_column(ss, 
"system"));
                s->owner = 
*(sqlid*)store->table_api.table_fetch_value(rt_schemas, find_sql_column(ss, 
"owner"));
 
-               s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
false, true, store);
-               s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, false, 
true, store);
-               s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, false, 
false, store);
-               s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, false, 
true, store);
-               s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, false, 
true, store);
-               s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, false, 
true, store);
-               s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
false, true, store);
-               s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, false, 
false, store);
+               s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
false, true, true, store);
+               s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, false, 
true, true, store);
+               s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, false, 
false, false, store);
+               s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, false, 
true, true, store);
+               s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, false, 
true, true, store);
+               s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, false, 
true, true, store);
+               s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
false, true, true, store);
+               s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, false, 
false, false, store);
        }
 
        TRC_DEBUG(SQL_STORE, "Load schema: %s %d\n", s->base.name, s->base.id);
@@ -1582,14 +1582,14 @@ bootstrap_create_schema(sql_trans *tr, c
        s->auth_id = auth_id;
        s->owner = owner;
        s->system = TRUE;
-       s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, false, true, 
store);
-       s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, false, true, 
store);
-       s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, false, false, 
store);
-       s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, false, true, 
store);
-       s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, false, true, 
store);
-       s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, false, true, 
store);
-       s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, false, 
true, store);
-       s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, false, false, 
store);
+       s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, false, true, 
true, store);
+       s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, false, true, 
true, store);
+       s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, false, false, 
false, store);
+       s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, false, true, 
true, store);
+       s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, false, true, 
true, store);
+       s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, false, true, 
true, store);
+       s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, false, 
true, true, store);
+       s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, false, false, 
false, store);
        if (os_add(tr->cat->schemas, tr, s->base.name, &s->base)) {
                return NULL;
        }
@@ -3364,8 +3364,8 @@ sql_trans_create_(sqlstore *store, sql_t
        tr->cat = store->cat;
        if (!tr->cat) {
                store->cat = tr->cat = SA_ZNEW(tr->sa, sql_catalog);
-               store->cat->schemas = os_new(tr->sa, (destroy_fptr) 
&schema_destroy, false, true, store);
-               store->cat->objects = os_new(tr->sa, (destroy_fptr) 
&key_destroy, false, false, store);
+               store->cat->schemas = os_new(tr->sa, (destroy_fptr) 
&schema_destroy, false, true, true, store);
+               store->cat->objects = os_new(tr->sa, (destroy_fptr) 
&key_destroy, false, false, true, store);
        }
        tr->tmp = store->tmp;
        cs_new(&tr->localtmps, tr->sa, (fdestroy) &table_destroy);
@@ -3387,12 +3387,12 @@ schema_dup(sql_trans *tr, sql_schema *s,
        ns->system = s->system;
 
        sqlstore *store = tr->store;
-       ns->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
isTempSchema(s), true, store);
-       ns->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, isTempSchema(s), 
true, store);
-       ns->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, isTempSchema(s), 
true, store);
-       ns->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, isTempSchema(s), 
true, store);
-       ns->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
isTempSchema(s), true, store);
-       ns->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, 
isTempSchema(s), false, store);
+       ns->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
isTempSchema(s), true, true, store);
+       ns->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, isTempSchema(s), 
true, true, store);
+       ns->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, isTempSchema(s), 
true, true, store);
+       ns->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, isTempSchema(s), 
true, true, store);
+       ns->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
isTempSchema(s), true, true, store);
+       ns->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, 
isTempSchema(s), false, false, store);
 
        /* table_dup will dup keys, idxs, triggers and parts */
        struct os_iter oi;
@@ -4349,8 +4349,6 @@ sql_trans_create_func(sql_trans *tr, sql
        int number = 0, ftype = (int) type, flang = (int) lang;
        bit se;
 
-       if (os_has_changes(s->funcs, tr))
-               return NULL;
        sql_func *t = SA_ZNEW(tr->sa, sql_func);
        base_init(tr->sa, &t->base, next_oid(tr->store), TR_NEW, func);
        t->base.new = 1;
@@ -4521,14 +4519,14 @@ sql_trans_create_schema(sql_trans *tr, c
        s->auth_id = auth_id;
        s->owner = owner;
        s->system = FALSE;
-       s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
isTempSchema(s), true, store);
-       s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, 
isTempSchema(s), true, store);
-       s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, 
isTempSchema(s), false, store);
-       s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, isTempSchema(s), 
true, store);
-       s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, isTempSchema(s), 
true, store);
-       s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, isTempSchema(s), 
true, store);
-       s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
isTempSchema(s), true, store);
-       s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, 
isTempSchema(s), false, store);
+       s->tables = os_new(tr->sa, (destroy_fptr) &table_destroy, 
isTempSchema(s), true, true, store);
+       s->types = os_new(tr->sa, (destroy_fptr) &type_destroy, 
isTempSchema(s), true, true, store);
+       s->funcs = os_new(tr->sa, (destroy_fptr) &func_destroy, 
isTempSchema(s), false, false, store);
+       s->seqs = os_new(tr->sa, (destroy_fptr) &seq_destroy, isTempSchema(s), 
true, true, store);
+       s->keys = os_new(tr->sa, (destroy_fptr) &key_destroy, isTempSchema(s), 
true, true, store);
+       s->idxs = os_new(tr->sa, (destroy_fptr) &idx_destroy, isTempSchema(s), 
true, true, store);
+       s->triggers = os_new(tr->sa, (destroy_fptr) &trigger_destroy, 
isTempSchema(s), true, true, store);
+       s->parts = os_new(tr->sa, (destroy_fptr) &part_destroy, 
isTempSchema(s), false, false, store);
        s->store = tr->store;
 
        if (store->table_api.table_insert(tr, sysschema, &s->base.id, 
&s->base.name, &s->auth_id, &s->owner, &s->system)) {
@@ -4627,8 +4625,6 @@ sql_trans_add_table(sql_trans *tr, sql_t
        int res = 0;
        sql_table *dup = NULL;
 
-       if (os_has_changes(mt->s->parts, tr)) /* for inserts disallow 
concurrent transactions */
-               return -2;
        /* merge table depends on part table */
        if ((res = sql_trans_create_dependency(tr, pt->base.id, mt->base.id, 
TABLE_DEPENDENCY)))
                return res;
@@ -4673,8 +4669,6 @@ sql_trans_add_range_partition(sql_trans 
 
        vmin = vmax = (ValRecord) {.vtype = TYPE_void,};
 
-       if (os_has_changes(mt->s->parts, tr)) /* for either inserts or updates 
disallow concurrent transactions */
-               return -2;
        if ((res = new_table(tr, mt, &dup)))
                return res;
        mt = dup;
@@ -4805,8 +4799,6 @@ sql_trans_add_value_partition(sql_trans 
        int localtype = tpe.type->localtype, i = 0, res = 0;
        sql_table *dup = NULL;
 
-       if (os_has_changes(mt->s->parts, tr)) /* for either inserts or updates 
disallow concurrent transactions */
-               return -2;
        if ((res = new_table(tr, mt, &dup)))
                return res;
        mt = dup;
@@ -4958,8 +4950,6 @@ sql_trans_set_table_schema(sql_trans *tr
        oid rid;
        int res = 0;
 
-       if (os_has_changes(os->parts, tr)) /* because of merge tables disallow 
if there are changes there */
-               return -2;
        rid = store->table_api.column_find_row(tr, find_sql_column(systable, 
"id"), &t->base.id, NULL);
        assert(!is_oid_nil(rid));
        if ((res = store->table_api.column_update_value(tr, 
find_sql_column(systable, "schema_id"), rid, &(ns->base.id))))
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to