Changeset: 30d7cced0caf for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30d7cced0caf Modified Files: clients/Tests/exports.stable.out monetdb5/mal/mal.h monetdb5/mal/mal_client.c monetdb5/mal/mal_client.h monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_interpreter.c monetdb5/mal/mal_resource.c monetdb5/mal/mal_resource.h monetdb5/mal/mal_runtime.c monetdb5/modules/mal/sysmon.c Branch: sessions Log Message:
The workers and memory consumption should be associated with a dataflow stack. This limits parallelism within a single dataflow block. The alternative, keeping it at the Client level makes it much more difficult to decide if a particular thread may continue. In particular when we are confronted with a recursive MAL block call. diffs (199 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1347,7 +1347,7 @@ str LIKEjoin(bat *r1, bat *r2, const bat str LIKEjoin1(bat *r1, bat *r2, const bat *lid, const bat *rid, const bat *slid, const bat *srid, const bit *nil_matches, const lng *estimate); str MACROprocessor(Client cntxt, MalBlkPtr mb, Symbol t); int MAL_MAXCLIENTS; -int MALadmission(Client cntxt, lng argclaim, lng hotclaim); +int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim); str MALassertBit(void *ret, bit *val, str *msg); str MALassertHge(void *ret, hge *val, str *msg); str MALassertInt(void *ret, int *val, str *msg); diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h --- a/monetdb5/mal/mal.h +++ b/monetdb5/mal/mal.h @@ -271,6 +271,9 @@ typedef struct MALSTK { char status; /* srunning 'R' suspended 'S', quiting 'Q' */ int pcup; /* saved pc upon a recursive all */ oid tag; /* unique invocation call tag */ + ATOMIC_TYPE workers; /* Actual number of concurrent workers */ + ATOMIC_TYPE memory; /* Actual memory claim highwater mark */ + struct MALSTK *up; /* stack trace list */ struct MALBLK *blk; /* associated definition */ ValRecord stk[FLEXIBLE_ARRAY_MEMBER]; diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -249,7 +249,6 @@ MCinitClientRecord(Client c, oid user, b c->memorylimit = 0; c->querytimeout = 0; c->sessiontimeout = 0; - c->workers = c->memory = 0; c->itrace = 0; c->errbuf = 0; @@ -373,8 +372,6 @@ MCforkClient(Client father) son->memorylimit = father->memorylimit; son->querytimeout = father->querytimeout; son->sessiontimeout = father->sessiontimeout; - son->workers = father->workers; - son->memory = father->memory; if (son->prompt) GDKfree(son->prompt); @@ -444,7 +441,6 @@ MCfreeClient(Client c) c->memorylimit = 0; c->querytimeout = 0; c->sessiontimeout = 0; - c->workers = c->memory = 0; c->user = oid_nil; if( c->username){ GDKfree(c->username); diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h --- a/monetdb5/mal/mal_client.h +++ b/monetdb5/mal/mal_client.h @@ -79,8 +79,6 @@ typedef struct CLIENT { * For program debugging and performance trace we keep the actual resource claims. */ time_t lastcmd; /* set when query is received */ - int workers; /* Actual number of concurrent workers */ - int memory; /* Actual memory claim highwater mark */ /* The user can request a TRACE SQL statement, calling for collecting the events locally */ BAT *profticks; diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -380,7 +380,7 @@ DFLOWworker(void *T) continue; } - if (MALrunningThreads() > 2 && MALadmission(cntxt, fe->argclaim, fe->hotclaim)) { + if (MALrunningThreads() > 2 && MALadmission(flow->cntxt, flow->stk, fe->argclaim, fe->hotclaim)) { // never block on deblockdataflow() p= getInstrPtr(flow->mb,fe->pc); if( p->fcn != (MALfcn) deblockdataflow){ @@ -392,11 +392,13 @@ DFLOWworker(void *T) continue; } } + (void) ATOMIC_INC(&flow->stk->workers); error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); + (void) ATOMIC_DEC(&flow->stk->workers); PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT "," LLFMT " %s\n", fe->pc, id, fe->argclaim, fe->hotclaim, fe->maxclaim, error ? error : ""); /* release the memory claim */ - MALadmission(cntxt, -fe->argclaim, -fe->hotclaim); + MALadmission(flow->cntxt, flow->stk, -fe->argclaim, -fe->hotclaim); /* update the numa information. keep the thread-id producing the value */ p= getInstrPtr(flow->mb,fe->pc); for( i = 0; i < p->argc; i++) diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c --- a/monetdb5/mal/mal_interpreter.c +++ b/monetdb5/mal/mal_interpreter.c @@ -264,6 +264,8 @@ prepareMALstack(MalBlkPtr mb, int size) //stk->stksize = size; stk->stktop = mb->vtop; stk->blk = mb; + stk->workers = ATOMIC_VAR_INIT(0); + stk->memory = ATOMIC_VAR_INIT(0);; initStack(0, res); if(!res) { diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c --- a/monetdb5/mal/mal_resource.c +++ b/monetdb5/mal/mal_resource.c @@ -102,21 +102,28 @@ ATOMIC_TYPE mal_running = ATOMIC_VAR_INI /* experiments on sf-100 on small machine showed no real improvement */ int -MALadmission(Client cntxt, lng argclaim, lng hotclaim) +MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim) { - (void) cntxt; + int workers; + + (void) stk; /* optimistically set memory */ if (argclaim == 0) return 0; MT_lock_set(&admissionLock); /* Check if we are allowed a spawn another thread for this client */ - /* It is somewhat tricky, because we may be in a recursion, each of which is counted for. - * The MAL interpreter should trim the worker thread count as soon as we call a FUNCTION/PATTERN recursively + /* It is somewhat tricky, because we may be in a dataflow recursion, each of which is counted for. + * A way out is to attach the thread count to the MAL stacks instead. */ - if( cntxt->workerlimit && cntxt->workerlimit <= cntxt->workers){ + workers = (int) ATOMIC_GET(&stk->workers); + if( cntxt->workerlimit){ + if(cntxt->workerlimit <= workers){ PARDEBUG - fprintf(stderr, "#DFLOWadmit check workers %d <= %d\n", cntxt->workerlimit, cntxt->workers); + fprintf(stderr, "#DFLOWadmit check workers, not allowed %d <= %d\n", cntxt->workerlimit, workers); + MT_lock_unset(&admissionLock); + return 0; + } } /* Determine if the total memory resource is exhausted */ if ( memoryclaims < 0) diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h --- a/monetdb5/mal/mal_resource.h +++ b/monetdb5/mal/mal_resource.h @@ -21,7 +21,7 @@ #define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0) #define hashinfo(X,Id) ((X) && (X) != (Hash *) 1 ? heapinfo(&(X)->heap, Id) : 0) -mal_export int MALadmission(Client cntxt, lng argclaim, lng hotclaim); +mal_export int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim); #define FAIRNESS_THRESHOLD (MAX_DELAYS * DELAYUNIT) diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c --- a/monetdb5/mal/mal_runtime.c +++ b/monetdb5/mal/mal_runtime.c @@ -97,7 +97,6 @@ runtimeProfileInit(Client cntxt, MalBlkP QRYqueue[i].query = q? GDKstrdup(q):0; QRYqueue[i].status = "running"; QRYqueue[i].cntxt = cntxt; - cntxt->workers++; stk->tag = mb->tag = QRYqueue[i].tag; } qtop += i == qtop; @@ -141,7 +140,6 @@ runtimeProfileFinish(Client cntxt, MalBl qtop = j; QRYqueue[qtop].query = NULL; /* sentinel for SYSMONqueue() */ - cntxt->workers--; MT_lock_unset(&mal_delayLock); } diff --git a/monetdb5/modules/mal/sysmon.c b/monetdb5/modules/mal/sysmon.c --- a/monetdb5/modules/mal/sysmon.c +++ b/monetdb5/modules/mal/sysmon.c @@ -30,6 +30,7 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb, bat *w = getArgReference_bat(stk,pci,7); bat *m = getArgReference_bat(stk,pci,8); lng i, qtag; + int wrk, mem; str usr; timestamp tsn; str msg = MAL_SUCCEED; @@ -90,9 +91,12 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb, } if (BUNappend(started, &tsn, false) != GDK_SUCCEED) goto bailout; + + wrk = (int) ATOMIC_GET(&QRYqueue[i].stk->workers); + mem = (int) ATOMIC_GET(&QRYqueue[i].stk->memory); if (BUNappend(progress, &QRYqueue[i].progress, false) != GDK_SUCCEED || - BUNappend(workers, &QRYqueue[i].cntxt->workers, false) != GDK_SUCCEED || - BUNappend(memory, &QRYqueue[i].cntxt->memory, false) != GDK_SUCCEED) + BUNappend(workers, &wrk, false) != GDK_SUCCEED || + BUNappend(memory, &mem, false) != GDK_SUCCEED) goto bailout; } MT_lock_unset(&mal_delayLock); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list