Changeset: ef8252a0c336 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ef8252a0c336 Added Files: sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.sql sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.stable.err sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.stable.out Modified Files: clients/Tests/exports.stable.out configure.ag monetdb5/extras/jaql/jaqlscenario.c monetdb5/mal/mal.c monetdb5/mal/mal_client.c monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/mal/mal_debugger.c monetdb5/mal/mal_import.c monetdb5/mal/mal_private.h monetdb5/mal/mal_session.c monetdb5/modules/mal/mdb.c sql/backends/monet5/sql_scenario.c sql/storage/store.c sql/test/BugTracker-2013/Tests/All sql/test/BugTracker-2013/Tests/nestedcalls.sql sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Branch: default Log Message:
merge with default diffs (truncated from 874 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -3062,7 +3062,6 @@ str startProfiling(void); str startRef; void stopHeartbeat(void); void stopHttpdaemon(void); -void stopMALdataflow(void); str stopProfiling(void); str stopRef; void strAfterCall(ValPtr v, ValPtr bak); diff --git a/configure.ag b/configure.ag --- a/configure.ag +++ b/configure.ag @@ -1261,7 +1261,7 @@ if test "x$have_python3" != xno; then AC_MSG_ERROR([Python3 executable not found]) else have_python3=no - why_have_python2="(Python 3 executable not found)" + why_have_python3="(Python 3 executable not found)" fi fi fi diff --git a/monetdb5/extras/jaql/jaqlscenario.c b/monetdb5/extras/jaql/jaqlscenario.c --- a/monetdb5/extras/jaql/jaqlscenario.c +++ b/monetdb5/extras/jaql/jaqlscenario.c @@ -294,6 +294,7 @@ JAQLengine(Client c) printtree(c->fdout, j->p, 0, j->planf); mnstr_printf(c->fdout, "\n"); freetree(j->p); + c->glb = oldglb; return MAL_SUCCEED; /* don't have a plan generated */ } else if (j->debug) { msg = runMALDebugger(c, c->curprg); diff --git a/monetdb5/mal/mal.c b/monetdb5/mal/mal.c --- a/monetdb5/mal/mal.c +++ b/monetdb5/mal/mal.c @@ -192,6 +192,7 @@ char *mal_trace; /* enable profile even #include "mal_dataflow.h" #include "mal_profiler.h" #include "mal_http_daemon.h" +#include "mal_private.h" MT_Lock mal_contextLock MT_LOCK_INITIALIZER("mal_contextLock"); MT_Lock mal_namespaceLock MT_LOCK_INITIALIZER("mal_namespaceLock"); diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -171,7 +171,7 @@ MCnewClient(void) Client MCgetClient(int id) { - if (id < 0 || id > MAL_MAXCLIENTS) + if (id < 0 || id >= MAL_MAXCLIENTS) return NULL; return mal_clients + id; } @@ -430,7 +430,7 @@ MCcleanupClients(void) str MCsuspendClient(int id) { - if (id < 0 || id > MAL_MAXCLIENTS) + if (id < 0 || id >= MAL_MAXCLIENTS) throw(INVCRED, "mal.clients", INVCRED_WRONG_ID); mal_clients[id].itrace = 'S'; return MAL_SUCCEED; @@ -439,7 +439,7 @@ MCsuspendClient(int id) str MCawakeClient(int id) { - if (id < 0 || id > MAL_MAXCLIENTS) + if (id < 0 || id >= MAL_MAXCLIENTS) throw(INVCRED, "mal.clients", INVCRED_WRONG_ID); mal_clients[id].itrace = 0; return MAL_SUCCEED; diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -3,19 +3,19 @@ * Version 1.1 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * http://www.monetdb.org/Legal/MonetDBLicense - * + * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the * License for the specific language governing rights and limitations * under the License. - * + * * The Original Code is the MonetDB Database System. - * + * * The Initial Developer of the Original Code is CWI. * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. * Copyright August 2008-2013 MonetDB B.V. * All Rights Reserved. -*/ + */ /* * Out of order execution @@ -37,7 +37,7 @@ */ #include "monetdb_config.h" #include "mal_dataflow.h" -#include "mal_client.h" +#include "mal_private.h" #define DFLOWpending 0 /* runnable */ #define DFLOWrunning 1 /* currently in progress */ @@ -60,9 +60,11 @@ typedef struct FLOWEVENT { typedef struct queue { int size; /* size of queue */ int last; /* last element in the queue */ + int exitcount; /* how many threads should exit */ FlowEvent *data; MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ + MT_Sema e; /* synchronize exiting of thread */ } Queue; /* @@ -83,9 +85,11 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec; -#define MAXQ 256 -static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ -static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static struct worker { + MT_Id id; + enum {IDLE, RUNNING, EXITED} flag; +} workers[THREADS]; +static Queue *todo = 0; /* pending instructions */ static int volatile exiting = 0; /* @@ -122,10 +126,12 @@ q_create(int sz, const char *name) GDKfree(q); return NULL; } + q->exitcount = 0; (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */ MT_lock_init(&q->l, name); MT_sema_init(&q->s, 0, name); + MT_sema_init(&q->e, 0, name); return q; } @@ -211,6 +217,12 @@ q_dequeue(Queue *q) if (exiting) return NULL; MT_lock_set(&q->l, "q_dequeue"); + if (q->exitcount > 0) { + q->exitcount--; + MT_lock_unset(&q->l, "q_dequeue"); + MT_sema_up(&q->e, "q_dequeue"); + return NULL; + } assert(q->last > 0); if (q->last > 0) { /* LIFO favors garbage collection */ @@ -253,13 +265,14 @@ q_dequeue(Queue *q) */ static void -DFLOWworker(void *t) +DFLOWworker(void *T) { + struct worker *t = (struct worker *) T; DataFlow flow; FlowEvent fe = 0, fnxt = 0; + int id = (int) (t - workers); Thread thr; str error = 0; - Queue *todo = *(Queue **) t; int i,last; thr = THRnew("DFLOWworker"); @@ -267,9 +280,10 @@ DFLOWworker(void *t) GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - if (fnxt == 0) - fe = q_dequeue(todo); - else + if (fnxt == 0) { + if ((fe = q_dequeue(todo)) == NULL) + break;; + } else fe = fnxt; if (exiting) { break; @@ -297,8 +311,8 @@ DFLOWworker(void *t) } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); - PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); + PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", + fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -308,7 +322,7 @@ DFLOWworker(void *t) if (error) { MT_lock_set(&flow->flowlock, "runMALdataflow"); /* only collect one error (from one thread, needed for stable testing) */ - if (!flow->error) + if (!flow->error) flow->error = error; MT_lock_unset(&flow->flowlock, "runMALdataflow"); /* after an error we skip the rest of the block */ @@ -334,7 +348,7 @@ DFLOWworker(void *t) } #endif MT_lock_set(&flow->flowlock, "MALworker"); - + for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]) if (flow->status[i].state == DFLOWpending && flow->status[i].blocks == 1) { @@ -358,59 +372,52 @@ DFLOWworker(void *t) GDKfree(GDKerrbuf); GDKsetbuf(0); THRdel(thr); + t->flag = EXITED; } -/* +/* * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and - * typically is equal to the number of cores. - * A recursive MAL function call would make for one worker less, - * which limits the number of cores for parallel processing. + * typically is equal to the number of cores * The workers are assembled in a local table to enable debugging. - * - * BEWARE, failure to create a new worker thread is not an error - * but would lead to serial execution. */ static int DFLOWinitialize(void) { - int i, threads, grp; - MT_Id worker; + int i, limit; + int created = 0; - threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - for(grp = 0; grp< MAXQ; grp++) - if ( occupied[grp] == FALSE){ - occupied[grp] = TRUE; - break; - } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - if (grp > THREADS) { - // continue non-parallel + if (todo) { + /* somebody else beat us to it */ + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return 0; + } + todo = q_create(2048, "DFLOWinitialize"); + if (todo == NULL) { + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return -1; } - if ( todos[grp] ) - return grp; + limit = GDKnr_threads ? GDKnr_threads : 1; + for (i = 0; i < limit; i++) { + workers[i].flag = RUNNING; + if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) + workers[i].flag = IDLE; + else + created++; + } + if (created == 0) { + /* no threads created */ + q_destroy(todo); + todo = NULL; + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return -1; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return 0; +} - todos[grp] = q_create(2048, "todo"); - if (todos[grp] == NULL) - return -1; - - // associate a set of workers with the pool - for (i = 0; grp>= 0 && i < threads; i++){ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list