Changeset: 5c6184a59d69 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5c6184a59d69 Modified Files: sql/storage/objectset.c Branch: default Log Message:
Clean up remaining known synchronicity issues in objectset. diffs (truncated from 499 to 300 lines): diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c --- a/sql/storage/objectset.c +++ b/sql/storage/objectset.c @@ -14,11 +14,17 @@ struct versionhead ; -#define active (0) -#define under_destruction (1<<1) -#define block_destruction (1<<2) -#define deleted (1<<3) -#define rollbacked (1<<4) +#define active (0) +#define under_destruction (1<<1) +#define block_destruction (1<<2) +#define deleted (1<<3) +#define rollbacked (1<<4) + +/* This objectversion owns its associated versionhead. + * When this objectversion gets destroyed, + * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/ +#define name_based_versionhead_owner (1<<5) +#define id_based_versionhead_owner (1<<6) typedef struct objectversion { ulng ts; @@ -40,7 +46,6 @@ typedef struct versionhead { objectversion* ov; } versionhead ; -// TODO: this might be moved to the objectversion struct itself. typedef struct RW_lock { //readers-writer lock int reader_cnt; MT_Lock readers_lock; @@ -102,28 +107,20 @@ unlock_writer(objectset* os) MT_lock_unset(&os->rw_lock.general_lock); } +static bte os_atmc_get_state(objectversion *ov) { + bte state = (bte) ATOMIC_GET(&ov->state); + return state; +} + +static void os_atmc_set_state(objectversion *ov, bte state) { + ATOMIC_SET(&ov->state, state); +} + static versionhead * find_id(objectset *os, sqlid id) { if (os) { - lock_writer(os); - if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE && os->sa) { - hash_destroy(os->id_map); - os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key); - if (os->id_map == NULL) { - unlock_writer(os); - return NULL; - } - - for (versionhead *n = os->id_based_h; n; n = n->next ) { - int key = os_id_key(n); - - if (hash_add(os->id_map, key, n) == NULL) { - unlock_writer(os); - return NULL; - } - } - } + lock_reader(os); if (os->id_map) { int key = (int) BATatoms[TYPE_int].atomHash(&id); sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)]; @@ -132,23 +129,26 @@ find_id(objectset *os, sqlid id) versionhead *n = he->value; if (n && n->ov->b->id == id) { - unlock_writer(os); + unlock_reader(os); return n; } } - unlock_writer(os); + unlock_reader(os); return NULL; } - unlock_writer(os); + for (versionhead *n = os->id_based_h; n; n = n->next) { objectversion *ov = n->ov; /* check if ids match */ if (id == ov->b->id) { + unlock_reader(os); return n; } } } + + unlock_reader(os); return NULL; } @@ -179,11 +179,11 @@ node_destroy(objectset *os, sqlstore *st } static versionhead * -os_remove_name_based_chain(objectset *os, sqlstore *store, versionhead *n) +os_remove_name_based_chain(objectset *os, objectversion* ov) { - assert(n); + lock_writer(os); + versionhead *n = ov->name_based_head; versionhead *p = os->name_based_h; - if (p != n) while (p && p->next != n) p = p->next; @@ -201,19 +201,23 @@ os_remove_name_based_chain(objectset *os if (n == os->name_based_t) os->name_based_t = p; - lock_writer(os); if (os->name_map && n) hash_delete(os->name_map, n); + + os->name_based_cnt--; unlock_writer(os); - node_destroy(os, store, n); + bte state = os_atmc_get_state(ov); + state |= name_based_versionhead_owner; + os_atmc_set_state(ov, state); return p; } static versionhead * -os_remove_id_based_chain(objectset *os, sqlstore *store, versionhead *n) +os_remove_id_based_chain(objectset *os, objectversion* ov) { - assert(n); + lock_writer(os); + versionhead *n = ov->id_based_head; versionhead *p = os->id_based_h; if (p != n) @@ -233,12 +237,15 @@ os_remove_id_based_chain(objectset *os, if (n == os->id_based_t) os->id_based_t = p; - lock_writer(os); if (os->id_map && n) hash_delete(os->id_map, n); + + os->name_based_cnt--; unlock_writer(os); - node_destroy(os, store, n); + bte state = os_atmc_get_state(ov); + state |= id_based_versionhead_owner; + os_atmc_set_state(ov, state); return p; } @@ -255,11 +262,34 @@ node_create(sql_allocator *sa, objectver return n; } +static int +os_name_key(versionhead *n) +{ + return hash_key(n->ov->b->name); +} + static objectset * - os_append_node_name(objectset *os, versionhead *n) { lock_writer(os); + if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE && os->sa) { + hash_destroy(os->name_map); + os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key); + if (os->name_map == NULL) { + unlock_writer(os); + return NULL; + } + + for (versionhead *n = os->name_based_h; n; n = n->next ) { + int key = os_name_key(n); + + if (hash_add(os->name_map, key, n) == NULL) { + unlock_writer(os); + return NULL; + } + } + } + if (os->name_map) { int key = os->name_map->key(n); @@ -268,6 +298,7 @@ os_append_node_name(objectset *os, versi return NULL; } } + if (os->name_based_t) { os->name_based_t->next = n; } else { @@ -275,8 +306,8 @@ os_append_node_name(objectset *os, versi } n->prev = os->name_based_t; // aka the double linked list. os->name_based_t = n; + os->name_based_cnt++; unlock_writer(os); - os->name_based_cnt++; return os; } @@ -301,14 +332,31 @@ static objectset * os_append_node_id(objectset *os, versionhead *n) { lock_writer(os); + if ((!os->id_map || (os->id_map->size*16 < os->id_based_cnt && os->id_based_cnt > HASH_MIN_SIZE)) && os->sa) { + hash_destroy(os->id_map); + os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key); + if (os->id_map == NULL) { + unlock_writer(os); + return NULL; + } + for (versionhead *n = os->id_based_h; n; n = n->next ) { + int key = os_id_key(n); + + if (hash_add(os->id_map, key, n) == NULL) { + unlock_writer(os); + return NULL; + } + } + } + if (os->id_map) { int key = os->id_map->key(n); - if (hash_add(os->id_map, key, n) == NULL) { unlock_writer(os); return NULL; } } + if (os->id_based_t) { os->id_based_t->next = n; } else { @@ -316,8 +364,8 @@ os_append_node_id(objectset *os, version } n->prev = os->id_based_t; // aka the double linked list. os->id_based_t = n; + os->id_based_cnt++; unlock_writer(os); - os->id_based_cnt++; return os; } @@ -342,21 +390,23 @@ static versionhead * find_name(objectset static void objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov) { + + bte state = os_atmc_get_state(ov); + + if (state & name_based_versionhead_owner) { + node_destroy(ov->os, store, ov->name_based_head); + } + + if (state & id_based_versionhead_owner) { + node_destroy(ov->os, store, ov->id_based_head); + } + if (os->destroy) os->destroy(store, ov->b); _DELETE(ov); } -static bte os_atmc_get_state(objectversion *ov) { - bte state = (bte) ATOMIC_GET(&ov->state); - return state; -} - -static void os_atmc_set_state(objectversion *ov, bte state) { - ATOMIC_SET(&ov->state, state); -} - static inline objectversion* get_name_based_older_locked(objectversion* ov) { lock_reader(ov->os); @@ -414,7 +464,7 @@ static void else if (!name_based_older) { // this is a terminal node. i.e. this objectversion does not have name based committed history if (ov->name_based_head) // The oposite can happen during an early conflict in os_add or os_del. - os_remove_name_based_chain(ov->os, store, ov->name_based_head); + os_remove_name_based_chain(ov->os, ov); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list