Changeset: 65aaa772c7f8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=65aaa772c7f8
Modified Files:
        monetdb5/mal/mal_interpreter.c
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
        monetdb5/mal/mal_runtime.c
Branch: default
Log Message:

Merge heads.


diffs (227 lines):

diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -823,8 +823,6 @@ str runMALsequence(Client cntxt, MalBlkP
                                /* If needed recycle intermediate result */
                                if (pci->recycle > 0) 
                                        RECYCLEexit(cntxt, mb, stk, pci, 
&runtimeProfile);
-                               if ( cntxt->idx > 1 )
-                                       MALresourceFairness(GDKusec()- 
mb->starttime);
 
                                /* general garbage collection */
                                if (ret == MAL_SUCCEED && garbageControl(pci)) {
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -158,78 +158,77 @@ MALadmission(lng argclaim, lng hotclaim)
 }
 #endif
 
-/* Delay threads if too much competition arises and memory
- * becomes a scarce resource.
- * If in the mean time memory becomes free, or too many sleeping 
- * re-enable worker.
- * It may happen that all threads enter the wait state. So, keep
- * one running at all time 
+/* Delay threads if too much competition arises and memory becomes a scarce 
resource.
+ * If in the mean time memory becomes free, or too many sleeping re-enable 
worker.
+ * It may happen that all threads enter the wait state. So, keep one running 
at all time 
  * By keeping the query start time in the client record we can delay
  * them when resource stress occurs.
  */
-#include "gdk_atomic.h"
-static volatile ATOMIC_TYPE running;
+ATOMIC_TYPE mal_running;
 #ifdef ATOMIC_LOCK
-static MT_Lock runningLock MT_LOCK_INITIALIZER("runningLock");
+MT_Lock mal_runningLock MT_LOCK_INITIALIZER("mal_runningLock");
 #endif
 
 void
 MALresourceFairness(lng usec)
 {
+#ifdef FAIRNESS_THRESHOLD
        size_t rss;
-       unsigned int delay;
        lng clk;
-       int threads;
        int delayed= 0;
+       int users = 2;
 
+
+       if ( usec <= TIMESLICE)
+               return;
        /* use GDKmem_cursize as MT_getrss() is too expensive */
        rss = GDKmem_cursize();
        /* ample of memory available*/
-       if ( rss < MEMORY_THRESHOLD && usec <= TIMESLICE)
+       if ( rss <= MEMORY_THRESHOLD )
                return;
 
        /* worker reporting time spent  in usec! */
        clk =  usec / 1000;
 
-       if ( clk > DELAYUNIT ) {
-               PARDEBUG mnstr_printf(GDKstdout, "#delay initial "LLFMT"n", 
clk);
-               (void) ATOMIC_DEC(running, runningLock);
-               /* always keep one running to avoid all waiting  */
-               while (clk > 0 && running >= 2 && delayed < MAX_DELAYS) {
-                       /* speed up wake up when we have memory */
-                       if (rss < MEMORY_THRESHOLD )
+       /* cap the maximum penalty */
+       clk = clk > FAIRNESS_THRESHOLD? FAIRNESS_THRESHOLD:clk;
+
+       /* always keep one running to avoid all waiting  */
+       while (clk > DELAYUNIT && users > 1 && mal_running > (size_t) 
GDKnr_threads && rss > MEMORY_THRESHOLD) {
+               if ( delayed++ == 0){
+                               PARDEBUG mnstr_printf(GDKstdout, "#delay 
initial ["LLFMT"] memory  "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD );
+                               PARDEBUG mnstr_flush(GDKstdout);
+               }
+               if ( delayed == MAX_DELAYS){
+                               PARDEBUG mnstr_printf(GDKstdout, "#delay abort 
["LLFMT"] memory  "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD );
+                               PARDEBUG mnstr_flush(GDKstdout);
                                break;
-                       threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
-                       delay = (unsigned int) ( ((double)DELAYUNIT * running) 
/ threads) + 1;
-                       if (delay) {
-                               if ( delayed++ == 0){
-                                               PARDEBUG 
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory  "SZFMT"[%f]\n", 
delay, clk, rss, MEMORY_THRESHOLD );
-                                               PARDEBUG mnstr_flush(GDKstdout);
-                               }
-                               MT_sleep_ms(delay);
-                               rss = GDKmem_cursize();
-                       } else break;
-                       clk -= DELAYUNIT;
                }
-               (void) ATOMIC_INC(running, runningLock);
+               MT_sleep_ms(DELAYUNIT);
+               users= MCactiveClients(); // users excluding console
+               rss = GDKmem_cursize();
+               clk -= DELAYUNIT;
        }
+#else
+(void) usec;
+#endif
 }
 
 // Get a hint on the parallel behavior
 size_t
 MALrunningThreads(void)
 {
-       return running;
+       return mal_running;
 }
 
 void
 initResource(void)
 {
 #ifdef NEED_MT_LOCK_INIT
-       ATOMIC_INIT(runningLock);
+       ATOMIC_INIT(mal_runningLock);
 #ifdef USE_MAL_ADMISSION
        MT_lock_init(&admissionLock, "admissionLock");
 #endif
 #endif
-       running = (ATOMIC_TYPE) GDKnr_threads;
+       mal_running = (ATOMIC_TYPE) GDKnr_threads;
 }
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -10,10 +10,15 @@
 #define _MAL_RESOURCE_H
 
 #include "mal_interpreter.h"
+#include "gdk_atomic.h"
+mal_export ATOMIC_TYPE mal_running;
+#ifdef ATOMIC_LOCK
+mal_export MT_Lock mal_runningLock;
+#endif
 
-#define TIMESLICE  2000000 /* usec */
+#define TIMESLICE  (3 * 60 * 1000 * 1000) /* usec , 3 minute high priority */
 #define DELAYUNIT 2 /* ms delay in parallel processing decisions */
-#define MAX_DELAYS 1000 /* never wait forever */
+#define MAX_DELAYS 1000 /* never wait more then 2000 ms */
 
 //#define heapinfo(X,Id)       (((X) && (X)->base && ((X)->parentid == 0 || 
(X)->parentid == Id)) ? (X)->free : 0)
 #define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0)
@@ -24,6 +29,8 @@
 mal_export int MALadmission(lng argclaim, lng hotclaim);
 #endif
 
+#define FAIRNESS_THRESHOLD MAX_DELAYS * DELAYUNIT
+
 mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int 
i, int flag);
 mal_export void MALresourceFairness(lng usec);
 mal_export size_t MALrunningThreads(void);
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -19,10 +19,9 @@
 #include "mal_profiler.h"
 #include "mal_listing.h"
 #include "mal_authorize.h"
+#include "mal_resource.h"
 #include "mal_private.h"
 
-#define heapinfo(X) ((X) && (X)->base ? (X)->free: 0)
-#define hashinfo(X) ( (X)? heapinfo((X)->heap):0)
 
 // Keep a queue of running queries
 QueryQueue QRYqueue;
@@ -155,6 +154,7 @@ runtimeProfileBegin(Client cntxt, MalBlk
 {
        int tid = THRgettid();
 
+       assert(pci);
        /* keep track on the instructions taken in progress */
        cntxt->active = TRUE;
        if( tid < THREADS){
@@ -166,8 +166,12 @@ runtimeProfileBegin(Client cntxt, MalBlk
        /* always collect the MAL instruction execution time */
        gettimeofday(&pci->clock,NULL);
        prof->ticks = GDKusec();
+
+       /* keep track of actual running instructions over BATs */
+       if( isaBatType(getArgType(mb, pci, 0)) )
+               (void) ATOMIC_INC(mal_running, mal_runningLock);
+
        /* emit the instruction upon start as well */
-       
        if(malProfileMode > 0)
                profilerEvent(mb, stk, pci, TRUE, cntxt->username);
 }
@@ -185,6 +189,9 @@ runtimeProfileExit(Client cntxt, MalBlkP
        }
 
        assert(pci);
+       if( isaBatType(getArgType(mb, pci, 0)) )
+               (void) ATOMIC_DEC(mal_running, mal_runningLock);
+
        assert(prof);
        /* always collect the MAL instruction execution time */
        pci->ticks = GDKusec() - prof->ticks;
@@ -203,6 +210,9 @@ runtimeProfileExit(Client cntxt, MalBlkP
                        malProfileMode = 1;
        }
        cntxt->active = FALSE;
+       /* reduce threads of non-admin long running transaction if needed */
+       if ( cntxt->idx > 1 )
+               MALresourceFairness(GDKusec()- mb->starttime);
 }
 
 /*
@@ -218,9 +228,9 @@ getBatSpace(BAT *b){
        lng space=0;
        if( b == NULL)
                return 0;
-       if( b->T) space += heapinfo(&b->T->heap); 
-       if( b->T->vheap) space += heapinfo(b->T->vheap); 
-       if(b->T) space += hashinfo(b->T->hash); 
+       space += BATcount(b) * b->T->width;
+       if( b->T->vheap) space += heapinfo(b->T->vheap, abs(b->batCacheid)); 
+       if(b->T) space += hashinfo(b->T->hash, abs(b->batCacheid)); 
        space += IMPSimprintsize(b);
        return space;
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to