Changeset: 30d7cced0caf for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30d7cced0caf
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/mal/mal.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_client.h
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_interpreter.c
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
        monetdb5/mal/mal_runtime.c
        monetdb5/modules/mal/sysmon.c
Branch: sessions
Log Message:

The workers and memory consumption should be associated with a dataflow stack.
This limits parallelism within a single dataflow block.
The alternative, keeping it at the Client level makes it much more difficult
to decide if a particular thread may continue. In particular when we are
confronted with a recursive MAL block call.


diffs (199 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1347,7 +1347,7 @@ str LIKEjoin(bat *r1, bat *r2, const bat
 str LIKEjoin1(bat *r1, bat *r2, const bat *lid, const bat *rid, const bat 
*slid, const bat *srid, const bit *nil_matches, const lng *estimate);
 str MACROprocessor(Client cntxt, MalBlkPtr mb, Symbol t);
 int MAL_MAXCLIENTS;
-int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
+int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim);
 str MALassertBit(void *ret, bit *val, str *msg);
 str MALassertHge(void *ret, hge *val, str *msg);
 str MALassertInt(void *ret, int *val, str *msg);
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -271,6 +271,9 @@ typedef struct MALSTK {
        char status;            /* srunning 'R' suspended 'S', quiting 'Q' */
        int pcup;               /* saved pc upon a recursive all */
        oid tag;                /* unique invocation call tag */
+       ATOMIC_TYPE             workers;        /* Actual number of concurrent 
workers */
+       ATOMIC_TYPE             memory;         /* Actual memory claim 
highwater mark */
+
        struct MALSTK *up;      /* stack trace list */
        struct MALBLK *blk;     /* associated definition */
        ValRecord stk[FLEXIBLE_ARRAY_MEMBER];
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -249,7 +249,6 @@ MCinitClientRecord(Client c, oid user, b
        c->memorylimit = 0;
        c->querytimeout = 0;
        c->sessiontimeout = 0;
-       c->workers = c->memory = 0;
        c->itrace = 0;
        c->errbuf = 0;
 
@@ -373,8 +372,6 @@ MCforkClient(Client father)
                son->memorylimit = father->memorylimit;
                son->querytimeout = father->querytimeout;
                son->sessiontimeout = father->sessiontimeout;
-               son->workers = father->workers;
-               son->memory = father->memory;
 
                if (son->prompt)
                        GDKfree(son->prompt);
@@ -444,7 +441,6 @@ MCfreeClient(Client c)
        c->memorylimit = 0;
        c->querytimeout = 0;
        c->sessiontimeout = 0;
-       c->workers = c->memory = 0;
        c->user = oid_nil;
        if( c->username){
                GDKfree(c->username);
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -79,8 +79,6 @@ typedef struct CLIENT {
         * For program debugging and performance trace we keep the actual 
resource claims.
         */
        time_t  lastcmd;        /* set when query is received */
-       int             workers;        /* Actual number of concurrent workers 
*/
-       int             memory;         /* Actual memory claim highwater mark */
 
        /* The user can request a TRACE SQL statement, calling for collecting 
the events locally */
        BAT *profticks;                         
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,7 +380,7 @@ DFLOWworker(void *T)
                        continue;
                }
 
-               if (MALrunningThreads() > 2 && MALadmission(cntxt, 
fe->argclaim, fe->hotclaim)) {
+               if (MALrunningThreads() > 2 && MALadmission(flow->cntxt, 
flow->stk, fe->argclaim, fe->hotclaim)) {
                        // never block on deblockdataflow()
                        p= getInstrPtr(flow->mb,fe->pc);
                        if( p->fcn != (MALfcn) deblockdataflow){
@@ -392,11 +392,13 @@ DFLOWworker(void *T)
                                continue;
                        }
                }
+               (void) ATOMIC_INC(&flow->stk->workers);
                error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
+               (void) ATOMIC_DEC(&flow->stk->workers);
                PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " 
LLFMT "," LLFMT "," LLFMT " %s\n",
                                                 fe->pc, id, fe->argclaim, 
fe->hotclaim, fe->maxclaim, error ? error : "");
                /* release the memory claim */
-               MALadmission(cntxt, -fe->argclaim, -fe->hotclaim);
+               MALadmission(flow->cntxt, flow->stk, -fe->argclaim, 
-fe->hotclaim);
                /* update the numa information. keep the thread-id producing 
the value */
                p= getInstrPtr(flow->mb,fe->pc);
                for( i = 0; i < p->argc; i++)
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -264,6 +264,8 @@ prepareMALstack(MalBlkPtr mb, int size)
        //stk->stksize = size;
        stk->stktop = mb->vtop;
        stk->blk = mb;
+       stk->workers = ATOMIC_VAR_INIT(0);
+       stk->memory = ATOMIC_VAR_INIT(0);;
 
        initStack(0, res);
        if(!res) {
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -102,21 +102,28 @@ ATOMIC_TYPE mal_running = ATOMIC_VAR_INI
 /* experiments on sf-100 on small machine showed no real improvement */
 
 int
-MALadmission(Client cntxt, lng argclaim, lng hotclaim)
+MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim)
 {
-       (void) cntxt;
+       int workers;
+
+       (void) stk;
        /* optimistically set memory */
        if (argclaim == 0)
                return 0;
 
        MT_lock_set(&admissionLock);
        /* Check if we are allowed a spawn another thread for this client */
-       /* It is somewhat tricky, because we may be in a recursion, each of 
which is counted for.
-        * The MAL interpreter should trim the worker thread count as soon as 
we call a FUNCTION/PATTERN recursively
+       /* It is somewhat tricky, because we may be in a dataflow recursion, 
each of which is counted for.
+        * A way out is to attach the thread count to the MAL stacks instead.
         */
-       if( cntxt->workerlimit && cntxt->workerlimit <= cntxt->workers){
+       workers = (int) ATOMIC_GET(&stk->workers);
+       if( cntxt->workerlimit){
+               if(cntxt->workerlimit <= workers){
                        PARDEBUG
-                       fprintf(stderr, "#DFLOWadmit check workers %d <= %d\n", 
cntxt->workerlimit, cntxt->workers);
+                       fprintf(stderr, "#DFLOWadmit check workers, not allowed 
%d <= %d\n", cntxt->workerlimit, workers);
+                       MT_lock_unset(&admissionLock);
+                       return 0;
+               }
        }
        /* Determine if the total memory resource is exhausted */
        if ( memoryclaims < 0)
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -21,7 +21,7 @@
 #define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0)
 #define hashinfo(X,Id) ((X) && (X) != (Hash *) 1 ? heapinfo(&(X)->heap, Id) : 
0)
 
-mal_export int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
+mal_export int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng 
hotclaim);
 
 #define FAIRNESS_THRESHOLD (MAX_DELAYS * DELAYUNIT)
 
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -97,7 +97,6 @@ runtimeProfileInit(Client cntxt, MalBlkP
                QRYqueue[i].query = q? GDKstrdup(q):0;
                QRYqueue[i].status = "running";
                QRYqueue[i].cntxt = cntxt;
-               cntxt->workers++;
                stk->tag = mb->tag = QRYqueue[i].tag;
        }
        qtop += i == qtop;
@@ -141,7 +140,6 @@ runtimeProfileFinish(Client cntxt, MalBl
 
        qtop = j;
        QRYqueue[qtop].query = NULL; /* sentinel for SYSMONqueue() */
-       cntxt->workers--;
        MT_lock_unset(&mal_delayLock);
 }
 
diff --git a/monetdb5/modules/mal/sysmon.c b/monetdb5/modules/mal/sysmon.c
--- a/monetdb5/modules/mal/sysmon.c
+++ b/monetdb5/modules/mal/sysmon.c
@@ -30,6 +30,7 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb, 
        bat *w = getArgReference_bat(stk,pci,7);
        bat *m = getArgReference_bat(stk,pci,8);
        lng i, qtag;
+       int wrk, mem;
        str usr;
        timestamp tsn;
        str msg = MAL_SUCCEED;
@@ -90,9 +91,12 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb, 
                }
                if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
                        goto bailout;
+               
+               wrk = (int) ATOMIC_GET(&QRYqueue[i].stk->workers);
+               mem = (int) ATOMIC_GET(&QRYqueue[i].stk->memory);
                if (BUNappend(progress, &QRYqueue[i].progress, false) != 
GDK_SUCCEED ||
-                   BUNappend(workers, &QRYqueue[i].cntxt->workers, false) != 
GDK_SUCCEED ||
-                       BUNappend(memory, &QRYqueue[i].cntxt->memory, false) != 
GDK_SUCCEED)
+                   BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
+                       BUNappend(memory, &mem, false) != GDK_SUCCEED)
                        goto bailout;
        }
        MT_lock_unset(&mal_delayLock);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to