Changeset: 1544a64058f8 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1544a64058f8 Modified Files: common/utils/matomic.h gdk/gdk_system.c gdk/gdk_system.h monetdb5/mal/mal_dataflow.c testing/Mtest.py.in Branch: subquery Log Message:
merged with defaul diffs (297 lines): diff --git a/common/utils/matomic.h b/common/utils/matomic.h --- a/common/utils/matomic.h +++ b/common/utils/matomic.h @@ -14,6 +14,7 @@ * The following operations are defined: * ATOMIC_VAR_INIT -- initializer for the variable (not necessarily atomic!); * ATOMIC_INIT -- initialize the variable (not necessarily atomic!); + * ATOMIC_DESTROY -- destroy the variable * ATOMIC_GET -- return the value of a variable; * ATOMIC_SET -- set the value of a variable; * ATOMIC_XCG -- set the value of a variable, return original value; @@ -33,6 +34,7 @@ * * Some of these are also available for pointers: * ATOMIC_PTR_INIT + * ATOMIC_PTR_DESTROY * ATOMIC_PTR_GET * ATOMIC_PTR_SET * ATOMIC_PTR_XCG @@ -83,6 +85,7 @@ typedef unsigned long long ATOMIC_BASE_T #endif #define ATOMIC_INIT(var, val) atomic_init(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_DESTROY(var) ((void) 0) #define ATOMIC_GET(var) atomic_load(var) #define ATOMIC_SET(var, val) atomic_store(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_XCG(var, val) atomic_exchange(var, (ATOMIC_BASE_TYPE) (val)) @@ -98,6 +101,7 @@ typedef volatile atomic_address ATOMIC_P typedef void *_Atomic volatile ATOMIC_PTR_TYPE; #endif #define ATOMIC_PTR_INIT(var, val) atomic_init(var, val) +#define ATOMIC_PTR_DESTROY(var) ((void) 0) #define ATOMIC_PTR_VAR_INIT(val) ATOMIC_VAR_INIT(val) #define ATOMIC_PTR_GET(var) atomic_load(var) #define ATOMIC_PTR_SET(var, val) atomic_store(var, (void *) (val)) @@ -136,6 +140,7 @@ typedef volatile int64_t ATOMIC_TYPE; typedef int64_t ATOMIC_BASE_TYPE; #define ATOMIC_VAR_INIT(val) (val) #define ATOMIC_INIT(var, val) (*(var) = (val)) +#define ATOMIC_DESTROY(var) ((void) 0) #ifdef __INTEL_COMPILER #define ATOMIC_GET(var) _InterlockedExchangeAdd64(var, 0) @@ -173,6 +178,7 @@ typedef volatile int ATOMIC_TYPE; typedef int ATOMIC_BASE_TYPE; #define ATOMIC_VAR_INIT(val) (val) #define ATOMIC_INIT(var, val) (*(var) = (val)) +#define ATOMIC_DESTROY(var) ((void) 0) #ifdef __INTEL_COMPILER #define ATOMIC_GET(var) _InterlockedExchangeAdd(var, 0) @@ -207,6 +213,7 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE typedef PVOID volatile ATOMIC_PTR_TYPE; #define ATOMIC_PTR_INIT(var, val) (*(var) = (val)) +#define ATOMIC_PTR_DESTROY(var) ((void) 0) #define ATOMIC_PTR_VAR_INIT(val) (val) #define ATOMIC_PTR_GET(var) (*(var)) #define ATOMIC_PTR_SET(var, val) _InterlockedExchangePointer(var, (PVOID) (val)) @@ -243,6 +250,7 @@ typedef volatile int ATOMIC_TYPE; #endif #define ATOMIC_VAR_INIT(val) (val) #define ATOMIC_INIT(var, val) (*(var) = (val)) +#define ATOMIC_DESTROY(var) ((void) 0) #define ATOMIC_GET(var) __atomic_load_n(var, __ATOMIC_SEQ_CST) #define ATOMIC_SET(var, val) __atomic_store_n(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) @@ -255,6 +263,7 @@ typedef volatile int ATOMIC_TYPE; typedef void *volatile ATOMIC_PTR_TYPE; #define ATOMIC_PTR_INIT(var, val) (*(var) = (val)) +#define ATOMIC_PTR_DESTROY(var) ((void) 0) #define ATOMIC_PTR_GET(var) __atomic_load_n(var, __ATOMIC_SEQ_CST) #define ATOMIC_PTR_SET(var, val) __atomic_store_n(var, (val), __ATOMIC_SEQ_CST) #define ATOMIC_PTR_XCG(var, val) __atomic_exchange_n(var, (val), __ATOMIC_SEQ_CST) @@ -284,6 +293,8 @@ ATOMIC_INIT(ATOMIC_TYPE *var, ATOMIC_BAS } #define ATOMIC_INIT(var, val) ATOMIC_INIT((var), (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_DESTROY(var) pthread_mutex_destroy(&(var)->lck) + static inline ATOMIC_BASE_TYPE ATOMIC_GET(ATOMIC_TYPE *var) { @@ -306,11 +317,12 @@ ATOMIC_SET(ATOMIC_TYPE *var, ATOMIC_BASE static inline ATOMIC_BASE_TYPE ATOMIC_XCG(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val) { - ATOMIC_BASE_TYPE new; + ATOMIC_BASE_TYPE old; pthread_mutex_lock(&var->lck); - new = var->val = val; + old = var->val; + var->val = val; pthread_mutex_unlock(&var->lck); - return new; + return old; } #define ATOMIC_XCG(var, val) ATOMIC_XCG(var, (ATOMIC_BASE_TYPE) (val)) @@ -387,6 +399,8 @@ ATOMIC_PTR_INIT(ATOMIC_PTR_TYPE *var, vo var->val = val; } +#define ATOMIC_PTR_DESTROY(var) pthread_mutex_destroy(&(var)->lck) + static inline void * ATOMIC_PTR_GET(ATOMIC_PTR_TYPE *var) { @@ -408,11 +422,12 @@ ATOMIC_PTR_SET(ATOMIC_PTR_TYPE *var, voi static inline void * ATOMIC_PTR_XCG(ATOMIC_PTR_TYPE *var, void *val) { - void *new; + void *old; pthread_mutex_lock(&var->lck); - new = var->val = val; + old = var->val; + var->val = val; pthread_mutex_unlock(&var->lck); - return new; + return old; } static inline bool diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c --- a/gdk/gdk_system.c +++ b/gdk/gdk_system.c @@ -296,6 +296,7 @@ rm_winthread(struct winthread *w) if (*wp) *wp = w->next; LeaveCriticalSection(&winthread_cs); + ATOMIC_DESTROY(&w->exited); free(w); } @@ -617,6 +618,7 @@ rm_posthread_locked(struct posthread *p) ; if (*pp) *pp = p->next; + ATOMIC_DESTROY(&p->exited); free(p); } diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h --- a/gdk/gdk_system.h +++ b/gdk/gdk_system.h @@ -256,6 +256,8 @@ gdk_export int MT_join_thread(MT_Id t); break; \ } \ ATOMIC_CLEAR(&GDKlocklistlock); \ + ATOMIC_DESTROY(&(l)->contention); \ + ATOMIC_DESTROY(&(l)->sleep); \ } \ } while (0) diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -70,7 +70,7 @@ typedef struct DATAFLOW { MalStkPtr stk; int start, stop; /* guarded block under consideration*/ FlowEvent status; /* status of each instruction */ - str error; /* error encountered */ + ATOMIC_PTR_TYPE error; /* error encountered */ int *nodes; /* dependency graph nodes */ int *edges; /* dependency graph */ MT_Lock flowlock; /* lock to protect the above */ @@ -373,13 +373,10 @@ DFLOWworker(void *T) assert(flow); /* whenever we have a (concurrent) error, skip it */ - MT_lock_set(&flow->flowlock); - if (flow->error) { - MT_lock_unset(&flow->flowlock); + if (ATOMIC_PTR_GET(&flow->error)) { q_enqueue(flow->done, fe); continue; } - MT_lock_unset(&flow->flowlock); #ifdef USE_MAL_ADMISSION if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, fe->hotclaim)) { @@ -411,13 +408,10 @@ DFLOWworker(void *T) fe->state = DFLOWwrapup; MT_lock_unset(&flow->flowlock); if (error) { - MT_lock_set(&flow->flowlock); + void *null = NULL; /* only collect one error (from one thread, needed for stable testing) */ - if (!flow->error) - flow->error = error; - else + if (!ATOMIC_PTR_CAS(&flow->error, &null, error)) GDKfree(error); - MT_lock_unset(&flow->flowlock); /* after an error we skip the rest of the block */ q_enqueue(flow->done, fe); continue; @@ -572,7 +566,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb flow->status[n].pc = pc; flow->status[n].state = DFLOWpending; flow->status[n].cost = -1; - flow->status[n].flow->error = NULL; + ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL); /* administer flow dependencies */ for (j = p->retc; j < p->argc; j++) { @@ -784,9 +778,8 @@ DFLOWscheduler(DataFlow flow, struct wor ATOMIC_PTR_SET(&w->cntxt, NULL); /* wrap up errors */ assert(flow->done->last == 0); - if (flow->error ) { - PARDEBUG fprintf(stderr, "#errors encountered %s ", flow->error ? flow->error : "unknown"); - ret = flow->error; + if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL ) { + PARDEBUG fprintf(stderr, "#errors encountered %s ", ret); } return ret; } @@ -912,16 +905,13 @@ runMALdataflow(Client cntxt, MalBlkPtr m flow->cntxt = cntxt; flow->mb = mb; flow->stk = stk; - flow->error = 0; /* keep real block count, exclude brackets */ flow->start = startpc + 1; flow->stop = stoppc; - MT_lock_init(&flow->flowlock, "flow->flowlock"); flow->done = q_create(stoppc- startpc+1, "flow->done"); if (flow->done == NULL) { - MT_lock_destroy(&flow->flowlock); GDKfree(flow); throw(MAL, "dataflow", "runMALdataflow(): Failed to create flow->done queue"); } @@ -929,7 +919,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * sizeof(FlowEventRec)); if (flow->status == NULL) { q_destroy(flow->done); - MT_lock_destroy(&flow->flowlock); GDKfree(flow); throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL); } @@ -939,7 +928,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m if (flow->nodes == NULL) { GDKfree(flow->status); q_destroy(flow->done); - MT_lock_destroy(&flow->flowlock); GDKfree(flow); throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL); } @@ -948,10 +936,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m GDKfree(flow->nodes); GDKfree(flow->status); q_destroy(flow->done); - MT_lock_destroy(&flow->flowlock); GDKfree(flow); throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL); } + MT_lock_init(&flow->flowlock, "flow->flowlock"); + ATOMIC_PTR_INIT(&flow->error, NULL); msg = DFLOWinitBlk(flow, mb, size); if (msg == MAL_SUCCEED) @@ -962,6 +951,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m GDKfree(flow->nodes); q_destroy(flow->done); MT_lock_destroy(&flow->flowlock); + ATOMIC_PTR_DESTROY(&flow->error); GDKfree(flow); /* we created one worker, now tell one worker to exit again */ diff --git a/testing/Mtest.py.in b/testing/Mtest.py.in --- a/testing/Mtest.py.in +++ b/testing/Mtest.py.in @@ -2803,8 +2803,8 @@ def killProc(proc, outfile = None, cmd = proc.killed = True if proc.onechild: if procdebug: - print('killProc: calling proc.terminate() on PID %d' % proc.pid) - proc.terminate() + print('killProc: calling proc.kill() on PID %d' % proc.pid) + proc.kill() elif os.name == 'nt': if procdebug: print('killProc: starting process "taskkill" "/F" "/T" "/PID" "%s"\n' % str(proc.pid)) @@ -2814,7 +2814,7 @@ def killProc(proc, outfile = None, cmd = out, err = p.communicate() if procdebug: print('killProc: process exited "taskkill" "/F" "/T" "/PID" "%s" (%s)\n' % (str(proc.pid), proc.returncode)) - proc.terminate() + proc.kill() else: killchildren(proc.pid) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list