Changeset: 65aaa772c7f8 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=65aaa772c7f8 Modified Files: monetdb5/mal/mal_interpreter.c monetdb5/mal/mal_resource.c monetdb5/mal/mal_resource.h monetdb5/mal/mal_runtime.c Branch: default Log Message:
Merge heads. diffs (227 lines): diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c --- a/monetdb5/mal/mal_interpreter.c +++ b/monetdb5/mal/mal_interpreter.c @@ -823,8 +823,6 @@ str runMALsequence(Client cntxt, MalBlkP /* If needed recycle intermediate result */ if (pci->recycle > 0) RECYCLEexit(cntxt, mb, stk, pci, &runtimeProfile); - if ( cntxt->idx > 1 ) - MALresourceFairness(GDKusec()- mb->starttime); /* general garbage collection */ if (ret == MAL_SUCCEED && garbageControl(pci)) { diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c --- a/monetdb5/mal/mal_resource.c +++ b/monetdb5/mal/mal_resource.c @@ -158,78 +158,77 @@ MALadmission(lng argclaim, lng hotclaim) } #endif -/* Delay threads if too much competition arises and memory - * becomes a scarce resource. - * If in the mean time memory becomes free, or too many sleeping - * re-enable worker. - * It may happen that all threads enter the wait state. So, keep - * one running at all time +/* Delay threads if too much competition arises and memory becomes a scarce resource. + * If in the mean time memory becomes free, or too many sleeping re-enable worker. + * It may happen that all threads enter the wait state. So, keep one running at all time * By keeping the query start time in the client record we can delay * them when resource stress occurs. */ -#include "gdk_atomic.h" -static volatile ATOMIC_TYPE running; +ATOMIC_TYPE mal_running; #ifdef ATOMIC_LOCK -static MT_Lock runningLock MT_LOCK_INITIALIZER("runningLock"); +MT_Lock mal_runningLock MT_LOCK_INITIALIZER("mal_runningLock"); #endif void MALresourceFairness(lng usec) { +#ifdef FAIRNESS_THRESHOLD size_t rss; - unsigned int delay; lng clk; - int threads; int delayed= 0; + int users = 2; + + if ( usec <= TIMESLICE) + return; /* use GDKmem_cursize as MT_getrss() is too expensive */ rss = GDKmem_cursize(); /* ample of memory available*/ - if ( rss < MEMORY_THRESHOLD && usec <= TIMESLICE) + if ( rss <= MEMORY_THRESHOLD ) return; /* worker reporting time spent in usec! */ clk = usec / 1000; - if ( clk > DELAYUNIT ) { - PARDEBUG mnstr_printf(GDKstdout, "#delay initial "LLFMT"n", clk); - (void) ATOMIC_DEC(running, runningLock); - /* always keep one running to avoid all waiting */ - while (clk > 0 && running >= 2 && delayed < MAX_DELAYS) { - /* speed up wake up when we have memory */ - if (rss < MEMORY_THRESHOLD ) + /* cap the maximum penalty */ + clk = clk > FAIRNESS_THRESHOLD? FAIRNESS_THRESHOLD:clk; + + /* always keep one running to avoid all waiting */ + while (clk > DELAYUNIT && users > 1 && mal_running > (size_t) GDKnr_threads && rss > MEMORY_THRESHOLD) { + if ( delayed++ == 0){ + PARDEBUG mnstr_printf(GDKstdout, "#delay initial ["LLFMT"] memory "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD ); + PARDEBUG mnstr_flush(GDKstdout); + } + if ( delayed == MAX_DELAYS){ + PARDEBUG mnstr_printf(GDKstdout, "#delay abort ["LLFMT"] memory "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD ); + PARDEBUG mnstr_flush(GDKstdout); break; - threads = GDKnr_threads > 0 ? GDKnr_threads : 1; - delay = (unsigned int) ( ((double)DELAYUNIT * running) / threads) + 1; - if (delay) { - if ( delayed++ == 0){ - PARDEBUG mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n", delay, clk, rss, MEMORY_THRESHOLD ); - PARDEBUG mnstr_flush(GDKstdout); - } - MT_sleep_ms(delay); - rss = GDKmem_cursize(); - } else break; - clk -= DELAYUNIT; } - (void) ATOMIC_INC(running, runningLock); + MT_sleep_ms(DELAYUNIT); + users= MCactiveClients(); // users excluding console + rss = GDKmem_cursize(); + clk -= DELAYUNIT; } +#else +(void) usec; +#endif } // Get a hint on the parallel behavior size_t MALrunningThreads(void) { - return running; + return mal_running; } void initResource(void) { #ifdef NEED_MT_LOCK_INIT - ATOMIC_INIT(runningLock); + ATOMIC_INIT(mal_runningLock); #ifdef USE_MAL_ADMISSION MT_lock_init(&admissionLock, "admissionLock"); #endif #endif - running = (ATOMIC_TYPE) GDKnr_threads; + mal_running = (ATOMIC_TYPE) GDKnr_threads; } diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h --- a/monetdb5/mal/mal_resource.h +++ b/monetdb5/mal/mal_resource.h @@ -10,10 +10,15 @@ #define _MAL_RESOURCE_H #include "mal_interpreter.h" +#include "gdk_atomic.h" +mal_export ATOMIC_TYPE mal_running; +#ifdef ATOMIC_LOCK +mal_export MT_Lock mal_runningLock; +#endif -#define TIMESLICE 2000000 /* usec */ +#define TIMESLICE (3 * 60 * 1000 * 1000) /* usec , 3 minute high priority */ #define DELAYUNIT 2 /* ms delay in parallel processing decisions */ -#define MAX_DELAYS 1000 /* never wait forever */ +#define MAX_DELAYS 1000 /* never wait more then 2000 ms */ //#define heapinfo(X,Id) (((X) && (X)->base && ((X)->parentid == 0 || (X)->parentid == Id)) ? (X)->free : 0) #define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0) @@ -24,6 +29,8 @@ mal_export int MALadmission(lng argclaim, lng hotclaim); #endif +#define FAIRNESS_THRESHOLD MAX_DELAYS * DELAYUNIT + mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag); mal_export void MALresourceFairness(lng usec); mal_export size_t MALrunningThreads(void); diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c --- a/monetdb5/mal/mal_runtime.c +++ b/monetdb5/mal/mal_runtime.c @@ -19,10 +19,9 @@ #include "mal_profiler.h" #include "mal_listing.h" #include "mal_authorize.h" +#include "mal_resource.h" #include "mal_private.h" -#define heapinfo(X) ((X) && (X)->base ? (X)->free: 0) -#define hashinfo(X) ( (X)? heapinfo((X)->heap):0) // Keep a queue of running queries QueryQueue QRYqueue; @@ -155,6 +154,7 @@ runtimeProfileBegin(Client cntxt, MalBlk { int tid = THRgettid(); + assert(pci); /* keep track on the instructions taken in progress */ cntxt->active = TRUE; if( tid < THREADS){ @@ -166,8 +166,12 @@ runtimeProfileBegin(Client cntxt, MalBlk /* always collect the MAL instruction execution time */ gettimeofday(&pci->clock,NULL); prof->ticks = GDKusec(); + + /* keep track of actual running instructions over BATs */ + if( isaBatType(getArgType(mb, pci, 0)) ) + (void) ATOMIC_INC(mal_running, mal_runningLock); + /* emit the instruction upon start as well */ - if(malProfileMode > 0) profilerEvent(mb, stk, pci, TRUE, cntxt->username); } @@ -185,6 +189,9 @@ runtimeProfileExit(Client cntxt, MalBlkP } assert(pci); + if( isaBatType(getArgType(mb, pci, 0)) ) + (void) ATOMIC_DEC(mal_running, mal_runningLock); + assert(prof); /* always collect the MAL instruction execution time */ pci->ticks = GDKusec() - prof->ticks; @@ -203,6 +210,9 @@ runtimeProfileExit(Client cntxt, MalBlkP malProfileMode = 1; } cntxt->active = FALSE; + /* reduce threads of non-admin long running transaction if needed */ + if ( cntxt->idx > 1 ) + MALresourceFairness(GDKusec()- mb->starttime); } /* @@ -218,9 +228,9 @@ getBatSpace(BAT *b){ lng space=0; if( b == NULL) return 0; - if( b->T) space += heapinfo(&b->T->heap); - if( b->T->vheap) space += heapinfo(b->T->vheap); - if(b->T) space += hashinfo(b->T->hash); + space += BATcount(b) * b->T->width; + if( b->T->vheap) space += heapinfo(b->T->vheap, abs(b->batCacheid)); + if(b->T) space += hashinfo(b->T->hash, abs(b->batCacheid)); space += IMPSimprintsize(b); return space; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list