Changeset: 5c6184a59d69 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5c6184a59d69
Modified Files:
        sql/storage/objectset.c
Branch: default
Log Message:

Clean up remaining known synchronicity issues in objectset.


diffs (truncated from 499 to 300 lines):

diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -14,11 +14,17 @@
 
 struct versionhead ;
 
-#define active                                 (0)
-#define under_destruction              (1<<1)
-#define block_destruction              (1<<2)
-#define deleted                                        (1<<3)
-#define rollbacked                             (1<<4)
+#define active                                                 (0)
+#define under_destruction                              (1<<1)
+#define block_destruction                              (1<<2)
+#define deleted                                                        (1<<3)
+#define rollbacked                                             (1<<4)
+
+/* This objectversion owns its associated versionhead.
+ * When this objectversion gets destroyed,
+ * the cleanup procedure should also destroy the associated (name|id) based 
versionhead.*/
+#define name_based_versionhead_owner   (1<<5)
+#define id_based_versionhead_owner             (1<<6)
 
 typedef struct objectversion {
        ulng ts;
@@ -40,7 +46,6 @@ typedef struct versionhead  {
     objectversion* ov;
 } versionhead ;
 
-// TODO: this might be moved to the objectversion struct itself.
 typedef struct RW_lock { //readers-writer lock
        int reader_cnt;
        MT_Lock readers_lock;
@@ -102,28 +107,20 @@ unlock_writer(objectset* os)
        MT_lock_unset(&os->rw_lock.general_lock);
 }
 
+static bte os_atmc_get_state(objectversion *ov) {
+       bte state = (bte) ATOMIC_GET(&ov->state);
+       return state;
+}
+
+static void os_atmc_set_state(objectversion *ov, bte state) {
+       ATOMIC_SET(&ov->state, state);
+}
+
 static versionhead  *
 find_id(objectset *os, sqlid id)
 {
        if (os) {
-               lock_writer(os);
-               if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && 
os->id_based_cnt > HASH_MIN_SIZE && os->sa) {
-                       hash_destroy(os->id_map);
-                       os->id_map = hash_new(os->sa, os->id_based_cnt, 
(fkeyvalue)&os_id_key);
-                       if (os->id_map == NULL) {
-                               unlock_writer(os);
-                               return NULL;
-                       }
-
-                       for (versionhead  *n = os->id_based_h; n; n = n->next ) 
{
-                               int key = os_id_key(n);
-
-                               if (hash_add(os->id_map, key, n) == NULL) {
-                                       unlock_writer(os);
-                                       return NULL;
-                               }
-                       }
-               }
+               lock_reader(os);
                if (os->id_map) {
                        int key = (int) BATatoms[TYPE_int].atomHash(&id);
                        sql_hash_e *he = 
os->id_map->buckets[key&(os->id_map->size-1)];
@@ -132,23 +129,26 @@ find_id(objectset *os, sqlid id)
                                versionhead  *n = he->value;
 
                                if (n && n->ov->b->id == id) {
-                                       unlock_writer(os);
+                                       unlock_reader(os);
                                        return n;
                                }
                        }
-                       unlock_writer(os);
+                       unlock_reader(os);
                        return NULL;
                }
-               unlock_writer(os);
+
                for (versionhead  *n = os->id_based_h; n; n = n->next) {
                        objectversion *ov = n->ov;
 
                        /* check if ids match */
                        if (id == ov->b->id) {
+                               unlock_reader(os);
                                return n;
                        }
                }
        }
+
+       unlock_reader(os);
        return NULL;
 }
 
@@ -179,11 +179,11 @@ node_destroy(objectset *os, sqlstore *st
 }
 
 static versionhead  *
-os_remove_name_based_chain(objectset *os, sqlstore *store, versionhead  *n)
+os_remove_name_based_chain(objectset *os, objectversion* ov)
 {
-       assert(n);
+       lock_writer(os);
+       versionhead  *n = ov->name_based_head;
        versionhead  *p = os->name_based_h;
-
        if (p != n)
                while (p && p->next != n)
                        p = p->next;
@@ -201,19 +201,23 @@ os_remove_name_based_chain(objectset *os
        if (n == os->name_based_t)
                os->name_based_t = p;
 
-       lock_writer(os);
        if (os->name_map && n)
                hash_delete(os->name_map, n);
+
+       os->name_based_cnt--;
        unlock_writer(os);
 
-       node_destroy(os, store, n);
+       bte state = os_atmc_get_state(ov);
+       state |= name_based_versionhead_owner;
+       os_atmc_set_state(ov, state);
        return p;
 }
 
 static versionhead  *
-os_remove_id_based_chain(objectset *os, sqlstore *store, versionhead  *n)
+os_remove_id_based_chain(objectset *os, objectversion* ov)
 {
-       assert(n);
+       lock_writer(os);
+       versionhead  *n = ov->id_based_head;
        versionhead  *p = os->id_based_h;
 
        if (p != n)
@@ -233,12 +237,15 @@ os_remove_id_based_chain(objectset *os, 
        if (n == os->id_based_t)
                os->id_based_t = p;
 
-       lock_writer(os);
        if (os->id_map && n)
                hash_delete(os->id_map, n);
+
+       os->name_based_cnt--;
        unlock_writer(os);
 
-       node_destroy(os, store, n);
+       bte state = os_atmc_get_state(ov);
+       state |= id_based_versionhead_owner;
+       os_atmc_set_state(ov, state);
        return p;
 }
 
@@ -255,11 +262,34 @@ node_create(sql_allocator *sa, objectver
        return n;
 }
 
+static int
+os_name_key(versionhead  *n)
+{
+       return hash_key(n->ov->b->name);
+}
+
 static objectset *
-
 os_append_node_name(objectset *os, versionhead  *n)
 {
        lock_writer(os);
+       if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && 
os->name_based_cnt > HASH_MIN_SIZE && os->sa) {
+               hash_destroy(os->name_map);
+               os->name_map = hash_new(os->sa, os->name_based_cnt, 
(fkeyvalue)& os_name_key);
+               if (os->name_map == NULL) {
+                       unlock_writer(os);
+                       return NULL;
+               }
+
+               for (versionhead  *n = os->name_based_h; n; n = n->next ) {
+                       int key = os_name_key(n);
+
+                       if (hash_add(os->name_map, key, n) == NULL) {
+                               unlock_writer(os);
+                               return NULL;
+                       }
+               }
+       }
+
        if (os->name_map) {
                int key = os->name_map->key(n);
 
@@ -268,6 +298,7 @@ os_append_node_name(objectset *os, versi
                        return NULL;
                }
        }
+
        if (os->name_based_t) {
                os->name_based_t->next = n;
        } else {
@@ -275,8 +306,8 @@ os_append_node_name(objectset *os, versi
        }
        n->prev = os->name_based_t; // aka the double linked list.
        os->name_based_t = n;
+       os->name_based_cnt++;
        unlock_writer(os);
-       os->name_based_cnt++;
        return os;
 }
 
@@ -301,14 +332,31 @@ static objectset *
 os_append_node_id(objectset *os, versionhead  *n)
 {
        lock_writer(os);
+       if ((!os->id_map || (os->id_map->size*16 < os->id_based_cnt && 
os->id_based_cnt > HASH_MIN_SIZE)) && os->sa) {
+               hash_destroy(os->id_map);
+               os->id_map = hash_new(os->sa, os->id_based_cnt, 
(fkeyvalue)&os_id_key);
+               if (os->id_map == NULL) {
+                       unlock_writer(os);
+                       return NULL;
+               }
+               for (versionhead  *n = os->id_based_h; n; n = n->next ) {
+                       int key = os_id_key(n);
+
+                       if (hash_add(os->id_map, key, n) == NULL) {
+                               unlock_writer(os);
+                               return NULL;
+                       }
+               }
+       }
+
        if (os->id_map) {
                int key = os->id_map->key(n);
-
                if (hash_add(os->id_map, key, n) == NULL) {
                        unlock_writer(os);
                        return NULL;
                }
        }
+
        if (os->id_based_t) {
                os->id_based_t->next = n;
        } else {
@@ -316,8 +364,8 @@ os_append_node_id(objectset *os, version
        }
        n->prev = os->id_based_t; // aka the double linked list.
        os->id_based_t = n;
+       os->id_based_cnt++;
        unlock_writer(os);
-       os->id_based_cnt++;
        return os;
 }
 
@@ -342,21 +390,23 @@ static versionhead * find_name(objectset
 static void
 objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
 {
+
+       bte state = os_atmc_get_state(ov);
+
+       if (state & name_based_versionhead_owner) {
+               node_destroy(ov->os, store, ov->name_based_head);
+       }
+
+       if (state & id_based_versionhead_owner) {
+               node_destroy(ov->os, store, ov->id_based_head);
+       }
+
        if (os->destroy)
                os->destroy(store, ov->b);
 
        _DELETE(ov);
 }
 
-static bte os_atmc_get_state(objectversion *ov) {
-       bte state = (bte) ATOMIC_GET(&ov->state);
-       return state;
-}
-
-static void os_atmc_set_state(objectversion *ov, bte state) {
-       ATOMIC_SET(&ov->state, state);
-}
-
 static inline objectversion*
 get_name_based_older_locked(objectversion* ov) {
        lock_reader(ov->os);
@@ -414,7 +464,7 @@ static void
        else if (!name_based_older) {
                // this is a terminal node. i.e. this objectversion does not 
have name based committed history
                if (ov->name_based_head) // The oposite can happen during an 
early conflict in os_add or os_del.
-                       os_remove_name_based_chain(ov->os, store, 
ov->name_based_head);
+                       os_remove_name_based_chain(ov->os, ov);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to