Changeset: ef8252a0c336 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ef8252a0c336
Added Files:
        sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.sql
        sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.stable.err
        sql/test/BugTracker-2013/Tests/alter_resets_readonly.Bug-3362.stable.out
Modified Files:
        clients/Tests/exports.stable.out
        configure.ag
        monetdb5/extras/jaql/jaqlscenario.c
        monetdb5/mal/mal.c
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_debugger.c
        monetdb5/mal/mal_import.c
        monetdb5/mal/mal_private.h
        monetdb5/mal/mal_session.c
        monetdb5/modules/mal/mdb.c
        sql/backends/monet5/sql_scenario.c
        sql/storage/store.c
        sql/test/BugTracker-2013/Tests/All
        sql/test/BugTracker-2013/Tests/nestedcalls.sql
        sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
Branch: default
Log Message:

merge with default


diffs (truncated from 874 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -3062,7 +3062,6 @@ str startProfiling(void);
 str startRef;
 void stopHeartbeat(void);
 void stopHttpdaemon(void);
-void stopMALdataflow(void);
 str stopProfiling(void);
 str stopRef;
 void strAfterCall(ValPtr v, ValPtr bak);
diff --git a/configure.ag b/configure.ag
--- a/configure.ag
+++ b/configure.ag
@@ -1261,7 +1261,7 @@ if test "x$have_python3" != xno; then
                                AC_MSG_ERROR([Python3 executable not found])
                        else
                                have_python3=no
-                               why_have_python2="(Python 3 executable not 
found)"
+                               why_have_python3="(Python 3 executable not 
found)"
                        fi
                fi
        fi
diff --git a/monetdb5/extras/jaql/jaqlscenario.c 
b/monetdb5/extras/jaql/jaqlscenario.c
--- a/monetdb5/extras/jaql/jaqlscenario.c
+++ b/monetdb5/extras/jaql/jaqlscenario.c
@@ -294,6 +294,7 @@ JAQLengine(Client c)
                printtree(c->fdout, j->p, 0, j->planf);
                mnstr_printf(c->fdout, "\n");
                freetree(j->p);
+               c->glb = oldglb;
                return MAL_SUCCEED;  /* don't have a plan generated */
        } else if (j->debug) {
                msg = runMALDebugger(c, c->curprg);
diff --git a/monetdb5/mal/mal.c b/monetdb5/mal/mal.c
--- a/monetdb5/mal/mal.c
+++ b/monetdb5/mal/mal.c
@@ -192,6 +192,7 @@ char *mal_trace;            /* enable profile even
 #include "mal_dataflow.h"
 #include "mal_profiler.h"
 #include "mal_http_daemon.h"
+#include "mal_private.h"
 
 MT_Lock     mal_contextLock MT_LOCK_INITIALIZER("mal_contextLock");
 MT_Lock     mal_namespaceLock MT_LOCK_INITIALIZER("mal_namespaceLock");
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -171,7 +171,7 @@ MCnewClient(void)
 Client
 MCgetClient(int id)
 {
-       if (id < 0 || id > MAL_MAXCLIENTS)
+       if (id < 0 || id >= MAL_MAXCLIENTS)
                return NULL;
        return mal_clients + id;
 }
@@ -430,7 +430,7 @@ MCcleanupClients(void)
 str
 MCsuspendClient(int id)
 {
-       if (id < 0 || id > MAL_MAXCLIENTS)
+       if (id < 0 || id >= MAL_MAXCLIENTS)
                throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
        mal_clients[id].itrace = 'S';
        return MAL_SUCCEED;
@@ -439,7 +439,7 @@ MCsuspendClient(int id)
 str
 MCawakeClient(int id)
 {
-       if (id < 0 || id > MAL_MAXCLIENTS)
+       if (id < 0 || id >= MAL_MAXCLIENTS)
                throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
        mal_clients[id].itrace = 0;
        return MAL_SUCCEED;
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -3,19 +3,19 @@
  * Version 1.1 (the "License"); you may not use this file except in
  * compliance with the License. You may obtain a copy of the License at
  * http://www.monetdb.org/Legal/MonetDBLicense
- * 
+ *
  * Software distributed under the License is distributed on an "AS IS"
  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
  * License for the specific language governing rights and limitations
  * under the License.
- * 
+ *
  * The Original Code is the MonetDB Database System.
- * 
+ *
  * The Initial Developer of the Original Code is CWI.
  * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
  * Copyright August 2008-2013 MonetDB B.V.
  * All Rights Reserved.
-*/
+ */
 
 /*
  * Out of order execution
@@ -37,7 +37,7 @@
  */
 #include "monetdb_config.h"
 #include "mal_dataflow.h"
-#include "mal_client.h"
+#include "mal_private.h"
 
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
@@ -60,9 +60,11 @@ typedef struct FLOWEVENT {
 typedef struct queue {
        int size;       /* size of queue */
        int last;       /* last element in the queue */
+       int exitcount;  /* how many threads should exit */
        FlowEvent *data;
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
+       MT_Sema e;      /* synchronize exiting of thread */
 } Queue;
 
 /*
@@ -83,9 +85,11 @@ typedef struct DATAFLOW {
        Queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
-#define MAXQ 256
-static Queue *todos[MAXQ] = {0};       /* pending instructions organized by 
dataflow block */
-static bit occupied[MAXQ]={0};                 /* worker pool is in use? */
+static struct worker {
+       MT_Id id;
+       enum {IDLE, RUNNING, EXITED} flag;
+} workers[THREADS];
+static Queue *todo = 0;        /* pending instructions */
 static int volatile exiting = 0;
 
 /*
@@ -122,10 +126,12 @@ q_create(int sz, const char *name)
                GDKfree(q);
                return NULL;
        }
+       q->exitcount = 0;
 
        (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
        MT_lock_init(&q->l, name);
        MT_sema_init(&q->s, 0, name);
+       MT_sema_init(&q->e, 0, name);
        return q;
 }
 
@@ -211,6 +217,12 @@ q_dequeue(Queue *q)
        if (exiting)
                return NULL;
        MT_lock_set(&q->l, "q_dequeue");
+       if (q->exitcount > 0) {
+               q->exitcount--;
+               MT_lock_unset(&q->l, "q_dequeue");
+               MT_sema_up(&q->e, "q_dequeue");
+               return NULL;
+       }
        assert(q->last > 0);
        if (q->last > 0) {
                /* LIFO favors garbage collection */
@@ -253,13 +265,14 @@ q_dequeue(Queue *q)
  */
 
 static void
-DFLOWworker(void *t)
+DFLOWworker(void *T)
 {
+       struct worker *t = (struct worker *) T;
        DataFlow flow;
        FlowEvent fe = 0, fnxt = 0;
+       int id = (int) (t - workers);
        Thread thr;
        str error = 0;
-       Queue *todo = *(Queue **) t;
        int i,last;
 
        thr = THRnew("DFLOWworker");
@@ -267,9 +280,10 @@ DFLOWworker(void *t)
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
-               if (fnxt == 0)
-                       fe = q_dequeue(todo);
-               else
+               if (fnxt == 0) {
+                       if ((fe = q_dequeue(todo)) == NULL)
+                               break;;
+               } else
                        fe = fnxt;
                if (exiting) {
                        break;
@@ -297,8 +311,8 @@ DFLOWworker(void *t)
                        }
 #endif
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
-                       PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
-                                                                 fe->pc, 
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
+                       PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d 
claim= " LLFMT "," LLFMT " %s\n",
+                                                        fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
                        /* release the memory claim */
                        MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -308,7 +322,7 @@ DFLOWworker(void *t)
                        if (error) {
                                MT_lock_set(&flow->flowlock, "runMALdataflow");
                                /* only collect one error (from one thread, 
needed for stable testing) */
-                               if (!flow->error) 
+                               if (!flow->error)
                                        flow->error = error;
                                MT_lock_unset(&flow->flowlock, 
"runMALdataflow");
                                /* after an error we skip the rest of the block 
*/
@@ -334,7 +348,7 @@ DFLOWworker(void *t)
                }
 #endif
                MT_lock_set(&flow->flowlock, "MALworker");
-       
+
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending &&
                                flow->status[i].blocks == 1) {
@@ -358,59 +372,52 @@ DFLOWworker(void *t)
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
        THRdel(thr);
+       t->flag = EXITED;
 }
 
-/* 
+/*
  * Create an interpreter pool.
  * One worker will adaptively be available for each client.
  * The remainder are taken from the GDKnr_threads argument and
- * typically is equal to the number of cores.
- * A recursive MAL function call would make for one worker less,
- * which limits the number of cores for parallel processing.
+ * typically is equal to the number of cores
  * The workers are assembled in a local table to enable debugging.
- *
- * BEWARE, failure to create a new worker thread is not an error
- * but would lead to serial execution.
  */
 static int
 DFLOWinitialize(void)
 {
-       int i, threads, grp;
-       MT_Id worker;
+       int i, limit;
+       int created = 0;
 
-       threads = GDKnr_threads ? GDKnr_threads : 1;
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       for(grp = 0; grp< MAXQ; grp++)
-               if ( occupied[grp] == FALSE){
-                       occupied[grp] = TRUE;
-                       break;
-               }
-       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-       if (grp > THREADS) {
-               // continue non-parallel
+       if (todo) {
+               /* somebody else beat us to it */
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+               return 0;
+       }
+       todo = q_create(2048, "DFLOWinitialize");
+       if (todo == NULL) {
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return -1;
        }
-       if ( todos[grp] )
-               return grp;
+       limit = GDKnr_threads ? GDKnr_threads : 1;
+       for (i = 0; i < limit; i++) {
+               workers[i].flag = RUNNING;
+               if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE) < 0)
+                       workers[i].flag = IDLE;
+               else
+                       created++;
+       }
+       if (created == 0) {
+               /* no threads created */
+               q_destroy(todo);
+               todo = NULL;
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+               return -1;
+       }
+       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+       return 0;
+}
 
-       todos[grp] = q_create(2048, "todo");
-       if (todos[grp] == NULL) 
-               return -1;
-
-       // associate a set of workers with the pool
-       for (i = 0; grp>= 0 && i < threads; i++){
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to