Changeset: 92c641b07606 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/92c641b07606
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/bat/bat_storage.h
Branch: Jun2023
Log Message:

Turn segment next pointer into an atomic.
This seems to fix the occasional problem that we get a crash when
concurrent work (changes and lookups) are done on segment chains that
are due to the compiler/CPU doing things out of the written order (which
they may).


diffs (truncated from 371 to 300 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -121,6 +121,7 @@ tc_gc_seg( sql_store Store, sql_change *
        if (s->ts <= oldest) {
                while(s) {
                        segment *n = s->prev;
+                       ATOMIC_PTR_DESTROY(&s->next);
                        _DELETE(s);
                        s = n;
                }
@@ -158,10 +159,10 @@ new_segment(segment *o, sql_trans *tr, s
                        n->start = 0;
                        n->end = cnt;
                }
-               n->next = NULL;
+               ATOMIC_PTR_INIT(&n->next, NULL);
                n->prev = NULL;
                if (o)
-                       o->next = n;
+                       ATOMIC_PTR_SET(&o->next, n);
        }
        return n;
 }
@@ -197,19 +198,19 @@ split_segment(segments *segs, segment *o
                 * inserted before */
                n->start = o->start;
                n->end = n->start + cnt;
-               n->next = o;
+               ATOMIC_PTR_INIT(&n->next, o);
                if (segs->h == o)
                        segs->h = n;
                if (p)
-                       p->next = n;
+                       ATOMIC_PTR_SET(&p->next, n);
                o->start = n->end;
        } else if (start+cnt == o->end) {
                /* 2-way split: o remains first part of segment, new one is
                 * added after */
                n->start = o->end - cnt;
                n->end = o->end;
-               n->next = o->next;
-               o->next = n;
+               ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
+               ATOMIC_PTR_SET(&o->next, n);
                if (segs->t == o)
                        segs->t = n;
                o->end = n->start;
@@ -221,15 +222,16 @@ split_segment(segments *segs, segment *o
                        GDKfree(n);
                        return NULL;
                }
-               n->next = n2;
+               ATOMIC_PTR_INIT(&n->next, n2);
                n->start = start;
                n->end = start + cnt;
                *n2 = *o;
+               ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
                n2->start = n->end;
                n2->prev = NULL;
                if (segs->t == o)
                        segs->t = n2;
-               o->next = n;
+               ATOMIC_PTR_SET(&o->next, n);
                o->end = start;
        }
        return n;
@@ -239,7 +241,7 @@ static void
 rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng 
oldest)
 {
        segment *cur = segs->h, *seg = NULL;
-       for (; cur; cur = cur->next) {
+       for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
                if (cur->ts == tr->tid) { /* revert */
                        cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
                        cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old 
ts */
@@ -251,7 +253,7 @@ rollback_segments(segments *segs, sql_tr
                        } else if (seg->end == cur->start && seg->deleted == 
cur->deleted) {
                                /* merge with previous */
                                seg->end = cur->end;
-                               seg->next = cur->next;
+                               ATOMIC_PTR_SET(&seg->next, 
ATOMIC_PTR_GET(&cur->next));
                                if (cur == segs->t)
                                        segs->t = seg;
                                mark4destroy(cur, change, 
store_get_timestamp(tr->store));
@@ -269,7 +271,7 @@ segs_end_include_deleted( segments *segs
        size_t cnt = 0;
        segment *s = segs->h, *l = NULL;
 
-       for(;s; s = s->next) {
+       for(;s; s = ATOMIC_PTR_GET(&s->next)) {
                if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
                                l = s;
        }
@@ -312,7 +314,7 @@ segments2cs(sql_trans *tr, segments *seg
        uint32_t *restrict dst;
        /* why hashlock ?? */
        MT_rwlock_wrlock(&b->thashlock);
-       for (; s ; s=s->next) {
+       for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
                if (s->start >= nr)
                        break;
                if (s->ts == tr->tid && s->end != s->start) {
@@ -395,7 +397,7 @@ merge_segments(storage *s, sql_trans *tr
 {
        sqlstore* store = tr->store;
        segment *cur = s->segs->h, *seg = NULL;
-       for (; cur; cur = cur->next) {
+       for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
                if (cur->ts == tr->tid) {
                        if (!cur->deleted)
                                cur->oldts = 0;
@@ -430,12 +432,13 @@ merge_segments(storage *s, sql_trans *tr
                                /* merge segments */
                                if (merge) {
                                        seg->end = cur->end;
-                                       seg->next = cur->next;
+                                       ATOMIC_PTR_SET(&seg->next, 
ATOMIC_PTR_GET(&cur->next));
                                        if (cur == s->segs->t)
                                                s->segs->t = seg;
-                                       if (commit_ts == oldest)
+                                       if (commit_ts == oldest) {
+                                               ATOMIC_PTR_DESTROY(&cur->next);
                                                _DELETE(cur);
-                                       else
+                                       } else
                                                mark4destroy(cur, change, 
commit_ts);
                                        cur = seg;
                                        continue;
@@ -454,7 +457,7 @@ segments_in_transaction(sql_trans *tr, s
 
        if (seg && s->segs->t->ts == tr->tid)
                return 1;
-       for (; seg ; seg=seg->next) {
+       for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
                if (seg->ts == tr->tid)
                        return 1;
        }
@@ -472,7 +475,7 @@ segs_end( segments *segs, sql_trans *tr,
        if (segs->t && SEG_IS_VALID(segs->t, tr))
                l = s = segs->t;
 
-       for(;s; s = s->next) {
+       for(;s; s = ATOMIC_PTR_GET(&s->next)) {
                if (SEG_IS_VALID(s, tr))
                                l = s;
        }
@@ -573,7 +576,7 @@ count_inserts( segment *s, sql_trans *tr
 {
        size_t cnt = 0;
 
-       for(;s; s = s->next) {
+       for(;s; s = ATOMIC_PTR_GET(&s->next)) {
                if (!s->deleted && s->ts == tr->tid)
                        cnt += s->end - s->start;
        }
@@ -585,10 +588,10 @@ count_deletes_in_range( segment *s, sql_
 {
        size_t cnt = 0;
 
-       for(;s && s->end <= start; s = s->next)
+       for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
                ;
 
-       for(;s && s->start < end; s = s->next) {
+       for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
                if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
                        cnt += s->end - s->start;
        }
@@ -600,7 +603,7 @@ count_deletes( segment *s, sql_trans *tr
 {
        size_t cnt = 0;
 
-       for(;s; s = s->next) {
+       for(;s; s = ATOMIC_PTR_GET(&s->next)) {
                if (SEG_IS_DELETED(s, tr))
                        cnt += s->end - s->start;
        }
@@ -1043,7 +1046,7 @@ cs_real_update_bats( column_storage *cs,
 static int
 segments_is_append(segment *s, sql_trans *tr, oid rid)
 {
-       for(; s; s=s->next) {
+       for(; s; s=ATOMIC_PTR_GET(&s->next)) {
                if (s->start <= rid && s->end > rid) {
                        if (s->ts == tr->tid && !s->deleted) {
                                return 1;
@@ -1057,7 +1060,7 @@ segments_is_append(segment *s, sql_trans
 static int
 segments_is_deleted(segment *s, sql_trans *tr, oid rid)
 {
-       for(; s; s=s->next) {
+       for(; s; s=ATOMIC_PTR_GET(&s->next)) {
                if (s->start <= rid && s->end > rid) {
                        if (s->ts >= tr->ts && s->deleted) {
                                return 1;
@@ -1350,7 +1353,7 @@ cs_update_bat( sql_trans *tr, sql_delta 
                        oid start = tids->tseqbase, offset = start;
                        oid end = start + ucnt;
 
-                       for(segment *seg = s->segs->h; seg && res == LOG_OK ; 
seg=seg->next) {
+                       for(segment *seg = s->segs->h; seg && res == LOG_OK ; 
seg=ATOMIC_PTR_GET(&seg->next)) {
                                if (seg->start <= start && seg->end > start) {
                                        /* check for delete conflicts */
                                        if (seg->ts >= tr->ts && seg->deleted) {
@@ -1395,7 +1398,7 @@ cs_update_bat( sql_trans *tr, sql_delta 
                        while ( seg && res == LOG_OK && i < ucnt) {
                                oid rid = canditer_next(&ci);
                                if (seg->end <= rid)
-                                       seg = seg->next;
+                                       seg = ATOMIC_PTR_GET(&seg->next);
                                else if (seg->start <= rid && seg->end > rid) {
                                        /* check for delete conflicts */
                                        if (seg->ts >= tr->ts && seg->deleted) {
@@ -1435,7 +1438,7 @@ cs_update_bat( sql_trans *tr, sql_delta 
                        segment *seg = s->segs->h;
                        while ( seg && res == LOG_OK && i < ucnt) {
                                if (seg->end <= rid[i])
-                                       seg = seg->next;
+                                       seg = ATOMIC_PTR_GET(&seg->next);
                                else if (seg->start <= rid[i] && seg->end > 
rid[i]) {
                                        /* check for delete conflicts */
                                        if (seg->ts >= tr->ts && seg->deleted) {
@@ -2340,7 +2343,7 @@ storage_delete_val(sql_trans *tr, sql_ta
        lock_table(tr->store, t->base.id);
        /* find segment of rid, split, mark new segment deleted (for tr->tid) */
        segment *seg = s->segs->h, *p = NULL;
-       for (; seg; p = seg, seg = seg->next) {
+       for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
                if (seg->start <= rid && seg->end > rid) {
                        if (!SEG_VALID_4_DELETE(seg,tr)) {
                                unlock_table(tr->store, t->base.id);
@@ -2367,7 +2370,7 @@ static int
 seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, 
size_t start, size_t cnt)
 {
        segment *seg = *Seg, *p = NULL;
-       for (; seg; p = seg, seg = seg->next) {
+       for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
                if (seg->start <= start && seg->end > start) {
                        size_t lcnt = cnt;
                        if (start+lcnt > seg->end)
@@ -2489,7 +2492,8 @@ destroy_segments(segments *s)
                return;
        segment *seg = s->h;
        while(seg) {
-               segment *n = seg->next;
+               segment *n = ATOMIC_PTR_GET(&seg->next);
+               ATOMIC_PTR_DESTROY(&seg->next);
                _DELETE(seg);
                seg = n;
        }
@@ -2518,11 +2522,11 @@ static int
 segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
 {
        if (uncommitted) {
-               for (segment *s = segs->h; s; s = s->next)
+               for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
                        if (!VALID_4_READ(s->ts,tr))
                                return 1;
        } else {
-               for (segment *s = segs->h; s; s = s->next)
+               for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
                        if (s->ts < TRANSACTION_ID_BASE && 
!VALID_4_READ(s->ts,tr))
                                return 1;
        }
@@ -2736,7 +2740,7 @@ count_segs(segment *s)
 {
        size_t nr = 0;
 
-       for( ; s; s = s->next)
+       for( ; s; s = ATOMIC_PTR_GET(&s->next))
                nr++;
        return nr;
 }
@@ -3272,7 +3276,7 @@ load_storage(sql_trans *tr, sql_table *t
                        bat_iterator_end(&bi);
                }
                if (ok == LOG_OK)
-                       for (segment *seg = s->segs->h; seg; seg = seg->next)
+                       for (segment *seg = s->segs->h; seg; seg = 
ATOMIC_PTR_GET(&seg->next))
                                if (seg->ts == tr->tid)
                                        seg->ts = 1;
        } else {
@@ -3349,7 +3353,7 @@ log_segments(sql_trans *tr, segments *se
 {
        /* log segments */
        lock_table(tr->store, id);
-       for (segment *seg = segs->h; seg; seg=seg->next) {
+       for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
                unlock_table(tr->store, id);
                if (seg->ts == tr->tid && seg->end-seg->start) {
                        if (log_segment(tr, seg, id) != LOG_OK) {
@@ -3870,7 +3874,7 @@ log_table_append(sql_trans *tr, sql_tabl
        size_t nr_appends = 0;
 
        lock_table(tr->store, t->base.id);
-       for (segment *seg = segs->h; seg; seg=seg->next) {
+       for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
                unlock_table(tr->store, t->base.id);
 
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to