Changeset: 1544a64058f8 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1544a64058f8
Modified Files:
        common/utils/matomic.h
        gdk/gdk_system.c
        gdk/gdk_system.h
        monetdb5/mal/mal_dataflow.c
        testing/Mtest.py.in
Branch: subquery
Log Message:

merged with defaul


diffs (297 lines):

diff --git a/common/utils/matomic.h b/common/utils/matomic.h
--- a/common/utils/matomic.h
+++ b/common/utils/matomic.h
@@ -14,6 +14,7 @@
  * The following operations are defined:
  * ATOMIC_VAR_INIT -- initializer for the variable (not necessarily atomic!);
  * ATOMIC_INIT -- initialize the variable (not necessarily atomic!);
+ * ATOMIC_DESTROY -- destroy the variable
  * ATOMIC_GET -- return the value of a variable;
  * ATOMIC_SET -- set the value of a variable;
  * ATOMIC_XCG -- set the value of a variable, return original value;
@@ -33,6 +34,7 @@
  *
  * Some of these are also available for pointers:
  * ATOMIC_PTR_INIT
+ * ATOMIC_PTR_DESTROY
  * ATOMIC_PTR_GET
  * ATOMIC_PTR_SET
  * ATOMIC_PTR_XCG
@@ -83,6 +85,7 @@ typedef unsigned long long ATOMIC_BASE_T
 #endif
 
 #define ATOMIC_INIT(var, val)  atomic_init(var, (ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_DESTROY(var)    ((void) 0)
 #define ATOMIC_GET(var)                atomic_load(var)
 #define ATOMIC_SET(var, val)   atomic_store(var, (ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_XCG(var, val)   atomic_exchange(var, (ATOMIC_BASE_TYPE) (val))
@@ -98,6 +101,7 @@ typedef volatile atomic_address ATOMIC_P
 typedef void *_Atomic volatile ATOMIC_PTR_TYPE;
 #endif
 #define ATOMIC_PTR_INIT(var, val)      atomic_init(var, val)
+#define ATOMIC_PTR_DESTROY(var)                ((void) 0)
 #define ATOMIC_PTR_VAR_INIT(val)       ATOMIC_VAR_INIT(val)
 #define ATOMIC_PTR_GET(var)            atomic_load(var)
 #define ATOMIC_PTR_SET(var, val)       atomic_store(var, (void *) (val))
@@ -136,6 +140,7 @@ typedef volatile int64_t ATOMIC_TYPE;
 typedef int64_t ATOMIC_BASE_TYPE;
 #define ATOMIC_VAR_INIT(val)   (val)
 #define ATOMIC_INIT(var, val)  (*(var) = (val))
+#define ATOMIC_DESTROY(var)    ((void) 0)
 
 #ifdef __INTEL_COMPILER
 #define ATOMIC_GET(var)                _InterlockedExchangeAdd64(var, 0)
@@ -173,6 +178,7 @@ typedef volatile int ATOMIC_TYPE;
 typedef int ATOMIC_BASE_TYPE;
 #define ATOMIC_VAR_INIT(val)   (val)
 #define ATOMIC_INIT(var, val)  (*(var) = (val))
+#define ATOMIC_DESTROY(var)    ((void) 0)
 
 #ifdef __INTEL_COMPILER
 #define ATOMIC_GET(var)                _InterlockedExchangeAdd(var, 0)
@@ -207,6 +213,7 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE
 
 typedef PVOID volatile ATOMIC_PTR_TYPE;
 #define ATOMIC_PTR_INIT(var, val)      (*(var) = (val))
+#define ATOMIC_PTR_DESTROY(var)                ((void) 0)
 #define ATOMIC_PTR_VAR_INIT(val)       (val)
 #define ATOMIC_PTR_GET(var)            (*(var))
 #define ATOMIC_PTR_SET(var, val)       _InterlockedExchangePointer(var, 
(PVOID) (val))
@@ -243,6 +250,7 @@ typedef volatile int ATOMIC_TYPE;
 #endif
 #define ATOMIC_VAR_INIT(val)   (val)
 #define ATOMIC_INIT(var, val)  (*(var) = (val))
+#define ATOMIC_DESTROY(var)    ((void) 0)
 
 #define ATOMIC_GET(var)                __atomic_load_n(var, __ATOMIC_SEQ_CST)
 #define ATOMIC_SET(var, val)   __atomic_store_n(var, (ATOMIC_BASE_TYPE) (val), 
__ATOMIC_SEQ_CST)
@@ -255,6 +263,7 @@ typedef volatile int ATOMIC_TYPE;
 
 typedef void *volatile ATOMIC_PTR_TYPE;
 #define ATOMIC_PTR_INIT(var, val)      (*(var) = (val))
+#define ATOMIC_PTR_DESTROY(var)                ((void) 0)
 #define ATOMIC_PTR_GET(var)            __atomic_load_n(var, __ATOMIC_SEQ_CST)
 #define ATOMIC_PTR_SET(var, val)       __atomic_store_n(var, (val), 
__ATOMIC_SEQ_CST)
 #define ATOMIC_PTR_XCG(var, val)       __atomic_exchange_n(var, (val), 
__ATOMIC_SEQ_CST)
@@ -284,6 +293,8 @@ ATOMIC_INIT(ATOMIC_TYPE *var, ATOMIC_BAS
 }
 #define ATOMIC_INIT(var, val)  ATOMIC_INIT((var), (ATOMIC_BASE_TYPE) (val))
 
+#define ATOMIC_DESTROY(var)    pthread_mutex_destroy(&(var)->lck)
+
 static inline ATOMIC_BASE_TYPE
 ATOMIC_GET(ATOMIC_TYPE *var)
 {
@@ -306,11 +317,12 @@ ATOMIC_SET(ATOMIC_TYPE *var, ATOMIC_BASE
 static inline ATOMIC_BASE_TYPE
 ATOMIC_XCG(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val)
 {
-       ATOMIC_BASE_TYPE new;
+       ATOMIC_BASE_TYPE old;
        pthread_mutex_lock(&var->lck);
-       new = var->val = val;
+       old = var->val;
+       var->val = val;
        pthread_mutex_unlock(&var->lck);
-       return new;
+       return old;
 }
 #define ATOMIC_XCG(var, val)   ATOMIC_XCG(var, (ATOMIC_BASE_TYPE) (val))
 
@@ -387,6 +399,8 @@ ATOMIC_PTR_INIT(ATOMIC_PTR_TYPE *var, vo
        var->val = val;
 }
 
+#define ATOMIC_PTR_DESTROY(var)        pthread_mutex_destroy(&(var)->lck)
+
 static inline void *
 ATOMIC_PTR_GET(ATOMIC_PTR_TYPE *var)
 {
@@ -408,11 +422,12 @@ ATOMIC_PTR_SET(ATOMIC_PTR_TYPE *var, voi
 static inline void *
 ATOMIC_PTR_XCG(ATOMIC_PTR_TYPE *var, void *val)
 {
-       void *new;
+       void *old;
        pthread_mutex_lock(&var->lck);
-       new = var->val = val;
+       old = var->val;
+       var->val = val;
        pthread_mutex_unlock(&var->lck);
-       return new;
+       return old;
 }
 
 static inline bool
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -296,6 +296,7 @@ rm_winthread(struct winthread *w)
        if (*wp)
                *wp = w->next;
        LeaveCriticalSection(&winthread_cs);
+       ATOMIC_DESTROY(&w->exited);
        free(w);
 }
 
@@ -617,6 +618,7 @@ rm_posthread_locked(struct posthread *p)
                ;
        if (*pp)
                *pp = p->next;
+       ATOMIC_DESTROY(&p->exited);
        free(p);
 }
 
diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h
--- a/gdk/gdk_system.h
+++ b/gdk/gdk_system.h
@@ -256,6 +256,8 @@ gdk_export int MT_join_thread(MT_Id t);
                                        break;                          \
                                }                                       \
                        ATOMIC_CLEAR(&GDKlocklistlock);                 \
+                       ATOMIC_DESTROY(&(l)->contention);               \
+                       ATOMIC_DESTROY(&(l)->sleep);                    \
                }                                                       \
        } while (0)
 
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -70,7 +70,7 @@ typedef struct DATAFLOW {
        MalStkPtr stk;
        int start, stop;    /* guarded block under consideration*/
        FlowEvent status;   /* status of each instruction */
-       str error;          /* error encountered */
+       ATOMIC_PTR_TYPE error;          /* error encountered */
        int *nodes;         /* dependency graph nodes */
        int *edges;         /* dependency graph */
        MT_Lock flowlock;   /* lock to protect the above */
@@ -373,13 +373,10 @@ DFLOWworker(void *T)
                assert(flow);
 
                /* whenever we have a (concurrent) error, skip it */
-               MT_lock_set(&flow->flowlock);
-               if (flow->error) {
-                       MT_lock_unset(&flow->flowlock);
+               if (ATOMIC_PTR_GET(&flow->error)) {
                        q_enqueue(flow->done, fe);
                        continue;
                }
-               MT_lock_unset(&flow->flowlock);
 
 #ifdef USE_MAL_ADMISSION
                if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, 
fe->hotclaim)) {
@@ -411,13 +408,10 @@ DFLOWworker(void *T)
                fe->state = DFLOWwrapup;
                MT_lock_unset(&flow->flowlock);
                if (error) {
-                       MT_lock_set(&flow->flowlock);
+                       void *null = NULL;
                        /* only collect one error (from one thread, needed for 
stable testing) */
-                       if (!flow->error)
-                               flow->error = error;
-                       else
+                       if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
                                GDKfree(error);
-                       MT_lock_unset(&flow->flowlock);
                        /* after an error we skip the rest of the block */
                        q_enqueue(flow->done, fe);
                        continue;
@@ -572,7 +566,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                flow->status[n].pc = pc;
                flow->status[n].state = DFLOWpending;
                flow->status[n].cost = -1;
-               flow->status[n].flow->error = NULL;
+               ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
 
                /* administer flow dependencies */
                for (j = p->retc; j < p->argc; j++) {
@@ -784,9 +778,8 @@ DFLOWscheduler(DataFlow flow, struct wor
        ATOMIC_PTR_SET(&w->cntxt, NULL);
        /* wrap up errors */
        assert(flow->done->last == 0);
-       if (flow->error ) {
-               PARDEBUG fprintf(stderr, "#errors encountered %s ", flow->error 
? flow->error : "unknown");
-               ret = flow->error;
+       if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL ) {
+               PARDEBUG fprintf(stderr, "#errors encountered %s ", ret);
        }
        return ret;
 }
@@ -912,16 +905,13 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        flow->cntxt = cntxt;
        flow->mb = mb;
        flow->stk = stk;
-       flow->error = 0;
 
        /* keep real block count, exclude brackets */
        flow->start = startpc + 1;
        flow->stop = stoppc;
 
-       MT_lock_init(&flow->flowlock, "flow->flowlock");
        flow->done = q_create(stoppc- startpc+1, "flow->done");
        if (flow->done == NULL) {
-               MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
                throw(MAL, "dataflow", "runMALdataflow(): Failed to create 
flow->done queue");
        }
@@ -929,7 +919,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * 
sizeof(FlowEventRec));
        if (flow->status == NULL) {
                q_destroy(flow->done);
-               MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
                throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
        }
@@ -939,7 +928,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (flow->nodes == NULL) {
                GDKfree(flow->status);
                q_destroy(flow->done);
-               MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
                throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
        }
@@ -948,10 +936,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                GDKfree(flow->nodes);
                GDKfree(flow->status);
                q_destroy(flow->done);
-               MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
                throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
        }
+       MT_lock_init(&flow->flowlock, "flow->flowlock");
+       ATOMIC_PTR_INIT(&flow->error, NULL);
        msg = DFLOWinitBlk(flow, mb, size);
 
        if (msg == MAL_SUCCEED)
@@ -962,6 +951,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        GDKfree(flow->nodes);
        q_destroy(flow->done);
        MT_lock_destroy(&flow->flowlock);
+       ATOMIC_PTR_DESTROY(&flow->error);
        GDKfree(flow);
 
        /* we created one worker, now tell one worker to exit again */
diff --git a/testing/Mtest.py.in b/testing/Mtest.py.in
--- a/testing/Mtest.py.in
+++ b/testing/Mtest.py.in
@@ -2803,8 +2803,8 @@ def killProc(proc, outfile = None, cmd =
     proc.killed = True
     if proc.onechild:
         if procdebug:
-            print('killProc: calling proc.terminate() on PID %d' % proc.pid)
-        proc.terminate()
+            print('killProc: calling proc.kill() on PID %d' % proc.pid)
+        proc.kill()
     elif os.name == 'nt':
         if procdebug:
             print('killProc: starting process "taskkill" "/F" "/T" "/PID" 
"%s"\n' % str(proc.pid))
@@ -2814,7 +2814,7 @@ def killProc(proc, outfile = None, cmd =
         out, err = p.communicate()
         if procdebug:
             print('killProc: process exited "taskkill" "/F" "/T" "/PID" "%s" 
(%s)\n' % (str(proc.pid), proc.returncode))
-        proc.terminate()
+        proc.kill()
     else:
         killchildren(proc.pid)
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to