Changeset: 16f2b566e6bc for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=16f2b566e6bc
Modified Files:
        sql/storage/objectset.c
Branch: default
Log Message:

Use readers-writer-lock to synchronize access to *_older pointers.


diffs (207 lines):

diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -40,11 +40,19 @@ typedef struct versionhead  {
     objectversion* ov;
 } versionhead ;
 
+// TODO: this might be moved to the objectversion struct itself.
+typedef struct RW_lock { //readers-writer lock
+       int reader_cnt;
+       MT_Lock readers_lock;
+       MT_Lock general_lock;
+} RW_lock;
+
 typedef struct objectset {
        int refcnt;
        sql_allocator *sa;
        destroy_fptr destroy;
        MT_Lock ht_lock;        /* latch protecting ht */
+       RW_lock rw_lock;        /*readers-writer lock to protect the links 
(chains) in the objectversion chain.*/
        versionhead  *name_based_h;
        versionhead  *name_based_t;
        versionhead  *id_based_h;
@@ -318,6 +326,54 @@ static void os_atmc_set_state(objectvers
        ATOMIC_SET(&ov->state, state);
 }
 
+static inline void
+lock_reader(objectset* os)
+{
+       MT_lock_set(&os->rw_lock.readers_lock);
+       if (1 == ++os->rw_lock.reader_cnt) {
+               MT_lock_set(&os->rw_lock.general_lock);
+       }
+       MT_lock_unset(&os->rw_lock.readers_lock);
+}
+
+static inline void
+unlock_reader(objectset* os)
+{
+       MT_lock_set(&os->rw_lock.readers_lock);
+       if (0 == --os->rw_lock.reader_cnt) {
+               MT_lock_unset(&os->rw_lock.general_lock);
+       }
+       MT_lock_unset(&os->rw_lock.readers_lock);
+}
+
+static inline void
+lock_writer(objectset* os)
+{
+       MT_lock_set(&os->rw_lock.general_lock);
+}
+
+static inline void
+unlock_writer(objectset* os)
+{
+       MT_lock_unset(&os->rw_lock.general_lock);
+}
+
+static inline objectversion*
+get_name_based_older_locked(objectversion* ov) {
+       lock_reader(ov->os);
+       objectversion* name_based_older = ov->name_based_older;
+       unlock_reader(ov->os);
+       return name_based_older;
+}
+
+static inline objectversion*
+get_id_based_older_locked(objectversion* ov) {
+       lock_reader(ov->os);
+       objectversion* id_based_older = ov->id_based_older;
+       unlock_reader(ov->os);
+       return id_based_older;
+}
+
 static void
 _os_rollback(objectversion *ov, sqlstore *store)
 {
@@ -333,8 +389,12 @@ static void
 
        bte state_older;
 
-       // TODO ATOMIC GET
-       objectversion* name_based_older = ov->name_based_older;
+       /*
+        * We have to use the readers-writer lock here,
+        * since the pointer containing the adress of the older objectversion 
might be concurrently overwritten if the older itself hass just been put in the 
under_destruction state .
+        */
+       objectversion* name_based_older = get_name_based_older_locked(ov);
+
        if (name_based_older && !((state_older= 
os_atmc_get_state(name_based_older)) & rollbacked)) {
                if (ov->ts != name_based_older->ts) {
                        // older is last committed state or belongs to parent 
transaction.
@@ -358,13 +418,15 @@ static void
                        os_remove_name_based_chain(ov->os, store, 
ov->name_based_head);
        }
 
-       // TODO ATOMIC GET
-       objectversion* id_based_older = ov->id_based_older;
+       /*
+        * We have to use the readers-writer lock here,
+        * since the pointer containing the adress of the older objectversion 
might be concurrently overwritten if the older itself hass just been put in the 
under_destruction state .
+        */
+       objectversion* id_based_older = get_id_based_older_locked(ov);
        if (id_based_older && !((state_older= 
os_atmc_get_state(id_based_older)) & rollbacked)) {
                if (ov->ts != id_based_older->ts) {
                        // older is last committed state or belongs to parent 
transaction.
                        // In any case, we restore versionhead pointer to that.
-                       // TODO START ATOMIC SET
 
                        ATOMIC_BASE_TYPE expected_deleted = deleted;
                        if (state_older == active || (state_older == deleted && 
ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
@@ -383,11 +445,11 @@ static void
        }
 
        if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & 
rollbacked)) {
-               _os_rollback(ov->id_based_older, store);
+               _os_rollback(ov->name_based_newer, store);
        }
 
        if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && 
!(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
-               _os_rollback(ov->id_based_older, store);
+               _os_rollback(ov->id_based_newer, store);
        }
 }
 
@@ -409,14 +471,18 @@ try_to_mark_deleted_for_destruction(sqls
                        os_remove_name_based_chain(ov->os, store, 
ov->name_based_head);
                }
                else {
+                       lock_writer(ov->os);
                        ov->name_based_newer->name_based_older = NULL;
+                       unlock_writer(ov->os);
                }
 
                if (!ov->id_based_newer || 
(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
                        os_remove_id_based_chain(ov->os, store, 
ov->id_based_head);
                }
                else {
+                       lock_writer(ov->os);
                        ov->id_based_newer->id_based_older = NULL;
+                       unlock_writer(ov->os);
                }
 
                ov->ts = store_get_timestamp(store)+1;
@@ -430,12 +496,6 @@ objectversion_destroy_recursive(sqlstore
                        objectversion_destroy_recursive(store, 
ov->id_based_older);
                }
 
-               if (ov->name_based_newer)
-                       ov->name_based_newer->name_based_older=NULL;
-
-               if (ov->id_based_newer)
-                       ov->id_based_newer->id_based_older=NULL;
-
                objectversion_destroy(store, ov->os, ov);
 }
 
@@ -542,6 +602,8 @@ os_new(sql_allocator *sa, destroy_fptr d
        };
        os->destroy = destroy;
        MT_lock_init(&os->ht_lock, "sa_ht_lock");
+       MT_lock_init(&os->rw_lock.readers_lock, "sa_readers_lock");
+       MT_lock_init(&os->rw_lock.general_lock, "sa_general_lock");
 
        return os;
 }
@@ -559,6 +621,8 @@ os_destroy(objectset *os, sql_store stor
        if (--os->refcnt > 0)
                return;
        MT_lock_destroy(&os->ht_lock);
+       MT_lock_destroy(&os->rw_lock.readers_lock);
+       MT_lock_destroy(&os->rw_lock.general_lock);
        versionhead* n=os->id_based_h;
        while(n) {
                objectversion *ov = n->ov;
@@ -688,7 +752,7 @@ get_valid_object_name(sql_trans *tr, obj
                if (ov->ts == tr->tid || (tr->parent && 
tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
                        return ov;
                else
-                       ov = ov->name_based_older;
+                       ov = get_name_based_older_locked(ov);
        }
        return ov;
 }
@@ -700,7 +764,7 @@ get_valid_object_id(sql_trans *tr, objec
                if (ov->ts == tr->tid || (tr->parent && 
tr_version_of_parent(tr, ov->ts))  || ov->ts < tr->ts)
                        return ov;
                else
-                       ov = ov->id_based_older;
+                       ov = get_id_based_older_locked(ov);
        }
        return ov;
 }
@@ -762,10 +826,8 @@ os_add_name_based(objectset *os, struct 
 static int
 os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion 
*ov) {
        versionhead  *id_based_node;
-       if (ov->name_based_older && ov->name_based_older->b->id == id)
-               id_based_node = ov->name_based_older->id_based_head;
-       else // Previous id based objectversion is of a different name, so now 
we do have to perform an extensive look up
-               id_based_node = find_id(os, id);
+
+       id_based_node = find_id(os, id);
 
        if (id_based_node) {
                objectversion *co = id_based_node->ov;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to