Changeset: cda5bc233217 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cda5bc233217
Modified Files:
        monetdb5/mal/mal.h
        monetdb5/mal/mal_interpreter.mx
Branch: default
Log Message:

Enhanced instruction scheduling
The dataflow scheduler has been extended to avoid resource fights,
especially in the situation of large databases.
The policy is to suspend a thread when it reaches the
end of an instruction chain, i.e. when it can not immediately continue
due to lack of available eligible instructions.

In such a situation, the thread is suspended equal to the time it
was producing output. Waiting time is reduced faster when memory resources
become available. likewise, when we have many sleeping threads, we
speed up their wakeup.
Initial experiments with SF100 shows good improvement, e.g. Q1 became
about 25% faster.


diffs (97 lines):

diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -46,6 +46,7 @@
  */
 #define MAXSCRIPT 64
 #define MEMORY_THRESHOLD  0.8
+#define DELAYUNIT 100 /* ms delay in parallel processing decissions */
 
 mal_export char     monet_cwd[PATHLENGTH];
 mal_export int      monet_welcome;
diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -1050,6 +1050,8 @@ DFLOWstep(FlowTask *t, FlowStatus fs)
  * It could also give preference to an instruction that eats away the object
  * just produced. THis way it need not be saved on disk for a long time.
  */
+static int asleep = 0;
+
 static void
 runDFLOWworker(void *t)
 {
@@ -1058,6 +1060,7 @@ runDFLOWworker(void *t)
        InstrPtr p;
        Thread thr;
        int i, local = 0, last = 0;
+       long usec=0;
 
        thr = THRnew(MT_getpid(), "DFLOWworker");
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN));     /* where to leave errors */
@@ -1066,16 +1069,18 @@ runDFLOWworker(void *t)
                local = nxtfs != 0;
                if (nxtfs == 0) {
                        fs = (FlowStatus)q_dequeue(task->todo);
+
 #ifdef USE_DFLOW_ADMISSION
                        if (DFLOWadmission(fs->argclaim, fs->hotclaim)) {
                                fs->hotclaim = 0;   /* don't assume priority 
anymore */
                                if ( task->todo->last == 0)
-                                       MT_sleep_ms(1);
+                                               MT_sleep_ms(DELAYUNIT);
                                q_requeue(task->todo, fs);
                                nxtfs = 0;
                                continue;
                        }
 #endif
+                       usec = GDKusec();
                } else
                        /* always execute, it does not affect memory claims */
                        fs = nxtfs;
@@ -1124,6 +1129,45 @@ runDFLOWworker(void *t)
                        q_requeue(task->flow->done, fs);
                else
                        q_enqueue(task->flow->done, fs);
+               /* Thread chain context switch decision. */
+               /* Delay the threads if too much competition arises */
+               /* If in the mean time memory becomes free, or too many sleep, 
re-enable worker */
+               /* It may happen that all threads enter the wait state. So, 
keep one running at all time */
+               if ( nxtfs == 0){
+                       long delay, clk =  (long)(GDKusec()-usec)/1000;
+                       double factor = 1.0;
+                       if ( clk > DELAYUNIT ) {
+                               long clkstart = clk;
+                               mal_set_lock(mal_contextLock, "runMALdataflow");
+                               asleep++;
+                               /* speedup as we see more threads asleep */
+                               clk = (long) (clk * (1.0- 
asleep/GDKnr_threads));
+                               /* always keep one running to avoid all waiting 
for a chain context switch */
+                               if ( asleep >= GDKnr_threads)
+                                       clk = -2 * DELAYUNIT;
+                               mal_unset_lock(mal_contextLock, 
"runMALdataflow");
+                               /* if there are no other instructions in the 
queue, then simply wait for them */
+                               if ( task->todo->last ==  0) 
+                                       clk = -3 * DELAYUNIT;
+       
+                               PARDEBUG if ( clk > 0) 
+                                       mnstr_printf(GDKstdout,"#delay %d 
initial %ld\n", task->id, clk);
+                               while (clk > 0 ){
+                                       /* speed up wake up when we have memory 
or too many sleepers */
+                                       factor = memorypool/(MEMORY_THRESHOLD * 
monet_memory);
+                                       delay = (long)( DELAYUNIT * (factor > 
1.0 ? 1.0:factor)) + 1;
+                                       MT_sleep_ms( delay );
+                                       clk -= DELAYUNIT;
+                               }
+                               if ( clkstart > DELAYUNIT || clkstart < - 
DELAYUNIT ) {
+                                       mal_set_lock(mal_contextLock, 
"runMALdataflow");
+                                       asleep--;
+                                       mal_unset_lock(mal_contextLock, 
"runMALdataflow");
+                                       PARDEBUG if ( clk > -2 * DELAYUNIT )
+                                               
mnstr_printf(GDKstdout,"#delayed finished thread %d asleep %d\n", task->id, 
asleep);
+                               }
+                       }
+               }
        }
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to