Changeset: 16f2b566e6bc for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=16f2b566e6bc Modified Files: sql/storage/objectset.c Branch: default Log Message:
Use readers-writer-lock to synchronize access to *_older pointers. diffs (207 lines): diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c --- a/sql/storage/objectset.c +++ b/sql/storage/objectset.c @@ -40,11 +40,19 @@ typedef struct versionhead { objectversion* ov; } versionhead ; +// TODO: this might be moved to the objectversion struct itself. +typedef struct RW_lock { //readers-writer lock + int reader_cnt; + MT_Lock readers_lock; + MT_Lock general_lock; +} RW_lock; + typedef struct objectset { int refcnt; sql_allocator *sa; destroy_fptr destroy; MT_Lock ht_lock; /* latch protecting ht */ + RW_lock rw_lock; /*readers-writer lock to protect the links (chains) in the objectversion chain.*/ versionhead *name_based_h; versionhead *name_based_t; versionhead *id_based_h; @@ -318,6 +326,54 @@ static void os_atmc_set_state(objectvers ATOMIC_SET(&ov->state, state); } +static inline void +lock_reader(objectset* os) +{ + MT_lock_set(&os->rw_lock.readers_lock); + if (1 == ++os->rw_lock.reader_cnt) { + MT_lock_set(&os->rw_lock.general_lock); + } + MT_lock_unset(&os->rw_lock.readers_lock); +} + +static inline void +unlock_reader(objectset* os) +{ + MT_lock_set(&os->rw_lock.readers_lock); + if (0 == --os->rw_lock.reader_cnt) { + MT_lock_unset(&os->rw_lock.general_lock); + } + MT_lock_unset(&os->rw_lock.readers_lock); +} + +static inline void +lock_writer(objectset* os) +{ + MT_lock_set(&os->rw_lock.general_lock); +} + +static inline void +unlock_writer(objectset* os) +{ + MT_lock_unset(&os->rw_lock.general_lock); +} + +static inline objectversion* +get_name_based_older_locked(objectversion* ov) { + lock_reader(ov->os); + objectversion* name_based_older = ov->name_based_older; + unlock_reader(ov->os); + return name_based_older; +} + +static inline objectversion* +get_id_based_older_locked(objectversion* ov) { + lock_reader(ov->os); + objectversion* id_based_older = ov->id_based_older; + unlock_reader(ov->os); + return id_based_older; +} + static void _os_rollback(objectversion *ov, sqlstore *store) { @@ -333,8 +389,12 @@ static void bte state_older; - // TODO ATOMIC GET - objectversion* name_based_older = ov->name_based_older; + /* + * We have to use the readers-writer lock here, + * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state . + */ + objectversion* name_based_older = get_name_based_older_locked(ov); + if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) { if (ov->ts != name_based_older->ts) { // older is last committed state or belongs to parent transaction. @@ -358,13 +418,15 @@ static void os_remove_name_based_chain(ov->os, store, ov->name_based_head); } - // TODO ATOMIC GET - objectversion* id_based_older = ov->id_based_older; + /* + * We have to use the readers-writer lock here, + * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state . + */ + objectversion* id_based_older = get_id_based_older_locked(ov); if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) { if (ov->ts != id_based_older->ts) { // older is last committed state or belongs to parent transaction. // In any case, we restore versionhead pointer to that. - // TODO START ATOMIC SET ATOMIC_BASE_TYPE expected_deleted = deleted; if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) { @@ -383,11 +445,11 @@ static void } if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) { - _os_rollback(ov->id_based_older, store); + _os_rollback(ov->name_based_newer, store); } if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) { - _os_rollback(ov->id_based_older, store); + _os_rollback(ov->id_based_newer, store); } } @@ -409,14 +471,18 @@ try_to_mark_deleted_for_destruction(sqls os_remove_name_based_chain(ov->os, store, ov->name_based_head); } else { + lock_writer(ov->os); ov->name_based_newer->name_based_older = NULL; + unlock_writer(ov->os); } if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) { os_remove_id_based_chain(ov->os, store, ov->id_based_head); } else { + lock_writer(ov->os); ov->id_based_newer->id_based_older = NULL; + unlock_writer(ov->os); } ov->ts = store_get_timestamp(store)+1; @@ -430,12 +496,6 @@ objectversion_destroy_recursive(sqlstore objectversion_destroy_recursive(store, ov->id_based_older); } - if (ov->name_based_newer) - ov->name_based_newer->name_based_older=NULL; - - if (ov->id_based_newer) - ov->id_based_newer->id_based_older=NULL; - objectversion_destroy(store, ov->os, ov); } @@ -542,6 +602,8 @@ os_new(sql_allocator *sa, destroy_fptr d }; os->destroy = destroy; MT_lock_init(&os->ht_lock, "sa_ht_lock"); + MT_lock_init(&os->rw_lock.readers_lock, "sa_readers_lock"); + MT_lock_init(&os->rw_lock.general_lock, "sa_general_lock"); return os; } @@ -559,6 +621,8 @@ os_destroy(objectset *os, sql_store stor if (--os->refcnt > 0) return; MT_lock_destroy(&os->ht_lock); + MT_lock_destroy(&os->rw_lock.readers_lock); + MT_lock_destroy(&os->rw_lock.general_lock); versionhead* n=os->id_based_h; while(n) { objectversion *ov = n->ov; @@ -688,7 +752,7 @@ get_valid_object_name(sql_trans *tr, obj if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts) return ov; else - ov = ov->name_based_older; + ov = get_name_based_older_locked(ov); } return ov; } @@ -700,7 +764,7 @@ get_valid_object_id(sql_trans *tr, objec if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts) return ov; else - ov = ov->id_based_older; + ov = get_id_based_older_locked(ov); } return ov; } @@ -762,10 +826,8 @@ os_add_name_based(objectset *os, struct static int os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) { versionhead *id_based_node; - if (ov->name_based_older && ov->name_based_older->b->id == id) - id_based_node = ov->name_based_older->id_based_head; - else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up - id_based_node = find_id(os, id); + + id_based_node = find_id(os, id); if (id_based_node) { objectversion *co = id_based_node->ov; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list