Changeset: 03f166caaf90 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/03f166caaf90
Modified Files:
        gdk/gdk_bat.c
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Fix some data races.
In BATsetaccess, use a lock, and don't change heaps that aren't yours
(views).
In dataflow, always UP the semaphore when exiting, otherwise there is a
race condition (not just data race).


diffs (91 lines):

diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -2282,44 +2282,54 @@ BATsetaccess(BAT *b, restrict_t newmode)
                        return NULL;
                b = bn;
        }
+       MT_lock_set(&b->theaplock);
        bakmode = (restrict_t) b->batRestricted;
        bakdirty = b->batDirtydesc;
        if (bakmode != newmode) {
                bool existing = (BBP_status(b->batCacheid) & BBPEXISTING) != 0;
                bool wr = (newmode == BAT_WRITE);
                bool rd = (bakmode == BAT_WRITE);
-               storage_t m1, m3 = STORE_MEM;
-               storage_t b1, b3 = STORE_MEM;
+               storage_t m1 = STORE_MEM, m3 = STORE_MEM;
+               storage_t b1 = STORE_MEM, b3 = STORE_MEM;
 
-               b1 = b->theap->newstorage;
-               m1 = HEAPchangeaccess(b->theap, ACCESSMODE(wr, rd), existing);
-               if (b->tvheap) {
+               if (b->theap->parentid == b->batCacheid) {
+                       b1 = b->theap->newstorage;
+                       m1 = HEAPchangeaccess(b->theap, ACCESSMODE(wr, rd), 
existing);
+               }
+               if (b->tvheap && b->tvheap->parentid == b->batCacheid) {
                        bool ta = (newmode == BAT_APPEND && 
ATOMappendpriv(b->ttype, b->tvheap));
                        b3 = b->tvheap->newstorage;
                        m3 = HEAPchangeaccess(b->tvheap, ACCESSMODE(wr && ta, 
rd && ta), existing);
                }
                if (m1 == STORE_INVALID || m3 == STORE_INVALID) {
+                       MT_lock_unset(&b->theaplock);
                        BBPunfix(b->batCacheid);
                        return NULL;
                }
 
                /* set new access mode and mmap modes */
-               b->batRestricted = (unsigned int) newmode;
+               b->batRestricted = newmode;
                b->batDirtydesc = true;
-               b->theap->newstorage = m1;
-               if (b->tvheap)
+               if (b->theap->parentid == b->batCacheid)
+                       b->theap->newstorage = m1;
+               if (b->tvheap && b->tvheap->parentid == b->batCacheid)
                        b->tvheap->newstorage = m3;
 
-               if (existing && BBPsave(b) != GDK_SUCCEED) {
+               MT_lock_unset(&b->theaplock);
+               if (existing && !isVIEW(b) && BBPsave(b) != GDK_SUCCEED) {
                        /* roll back all changes */
-                       b->batRestricted = (unsigned int) bakmode;
+                       MT_lock_set(&b->theaplock);
+                       b->batRestricted = bakmode;
                        b->batDirtydesc = bakdirty;
                        b->theap->newstorage = b1;
                        if (b->tvheap)
                                b->tvheap->newstorage = b3;
+                       MT_lock_unset(&b->theaplock);
                        BBPunfix(b->batCacheid);
                        return NULL;
                }
+       } else {
+               MT_lock_unset(&b->theaplock);
        }
        return b;
 }
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -318,6 +318,9 @@ DFLOWworker(void *T)
                        /* wait until we are allowed to start working */
                        MT_sema_down(&t->s);
                        t->flag = RUNNING;
+                       if (ATOMIC_GET(&exiting)) {
+                               break;
+                       }
                }
                assert(t->flag == RUNNING);
                cntxt = ATOMIC_PTR_GET(&t->cntxt);
@@ -980,8 +983,7 @@ stopMALdataflow(void)
                MT_lock_set(&dataflowLock);
                /* first wake up all running threads */
                for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag == RUNNING)
-                               MT_sema_up(&todo->s);
+                       MT_sema_up(&todo->s);
                }
                for (i = free_workers; i >= 0; i = workers[i].next) {
                        MT_sema_up(&workers[i].s);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to